From ebf2b71832e123c92a3815cf2ec12dff689b788c Mon Sep 17 00:00:00 2001 From: Mshehu5 Date: Mon, 16 Feb 2026 14:31:08 +0100 Subject: [PATCH 1/4] feat: add SQLite database for payjoin session persistence - Create db.rs with Database, SenderPersister and ReceiverPersister - Implement SessionPersister trait for both sender and receiver - Add session ID management and query methods - Add input deduplication tracking to prevent probing attacks - Add timestamp formatting utilities --- src/error.rs | 65 ++++++++ src/payjoin/db.rs | 397 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 462 insertions(+) create mode 100644 src/payjoin/db.rs diff --git a/src/error.rs b/src/error.rs index 3690d4f..9244096 100644 --- a/src/error.rs +++ b/src/error.rs @@ -140,6 +140,10 @@ pub enum BDKCliError { #[cfg(feature = "payjoin")] #[error("Payjoin create request error: {0}")] PayjoinCreateRequest(#[from] payjoin::send::v2::CreateRequestError), + + #[cfg(feature = "payjoin")] + #[error("Payjoin database error: {0}")] + PayjoinDb(#[from] PayjoinDbError), } impl From for BDKCliError { @@ -168,3 +172,64 @@ impl From for BDKCliError { BDKCliError::RusqliteError(Box::new(err)) } } + +/// Error type for payjoin database operations +#[cfg(feature = "payjoin")] +#[derive(Debug)] +pub enum PayjoinDbError { + /// SQLite database error + Rusqlite(bdk_wallet::rusqlite::Error), + /// JSON serialization error + Serialize(serde_json::Error), + /// JSON deserialization error + Deserialize(serde_json::Error), +} + +#[cfg(feature = "payjoin")] +impl std::fmt::Display for PayjoinDbError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PayjoinDbError::Rusqlite(e) => write!(f, "Database operation failed: {e}"), + PayjoinDbError::Serialize(e) => write!(f, "Serialization failed: {e}"), + PayjoinDbError::Deserialize(e) => write!(f, "Deserialization failed: {e}"), + } + } +} + +#[cfg(feature = "payjoin")] +impl std::error::Error for PayjoinDbError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + PayjoinDbError::Rusqlite(e) => Some(e), + PayjoinDbError::Serialize(e) => Some(e), + PayjoinDbError::Deserialize(e) => Some(e), + } + } +} + +#[cfg(feature = "payjoin")] +impl From for PayjoinDbError { + fn from(error: bdk_wallet::rusqlite::Error) -> Self { + PayjoinDbError::Rusqlite(error) + } +} + +#[cfg(feature = "payjoin")] +impl From for payjoin::ImplementationError { + fn from(error: PayjoinDbError) -> Self { + payjoin::ImplementationError::new(error) + } +} + +#[cfg(feature = "payjoin")] +impl + From> for BDKCliError +where + ApiErr: std::error::Error, + StorageErr: std::error::Error, + ErrorState: std::fmt::Debug, +{ + fn from(e: payjoin::persist::PersistedError) -> Self { + BDKCliError::Generic(e.to_string()) + } +} diff --git a/src/payjoin/db.rs b/src/payjoin/db.rs new file mode 100644 index 0000000..3525e44 --- /dev/null +++ b/src/payjoin/db.rs @@ -0,0 +1,397 @@ +use std::fmt; +use std::path::Path; +use std::sync::{Arc, Mutex, MutexGuard}; + +use bdk_wallet::rusqlite::{Connection, params}; +use payjoin::HpkePublicKey; +use payjoin::bitcoin::OutPoint; +use payjoin::bitcoin::consensus::encode::serialize; +use payjoin::persist::SessionPersister; +use payjoin::receive::v2::SessionEvent as ReceiverSessionEvent; +use payjoin::send::v2::SessionEvent as SenderSessionEvent; + +use crate::error::PayjoinDbError as Error; + +pub type Result = std::result::Result; + +/// Default filename for the payjoin database +pub const DB_FILENAME: &str = "payjoin.sqlite"; + +/// Returns the current Unix timestamp in seconds +#[inline] +fn now() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 +} + +pub struct Database { + conn: Mutex, +} + +impl Database { + pub fn create(path: impl AsRef) -> Result { + let conn = Connection::open(path.as_ref())?; + Self::init_schema(&conn)?; + Ok(Self { + conn: Mutex::new(conn), + }) + } + + fn conn(&self) -> MutexGuard<'_, Connection> { + self.conn + .lock() + .expect("Database mutex should not be poisoned") + } + + fn init_schema(conn: &Connection) -> Result<()> { + conn.execute("PRAGMA foreign_keys = ON", [])?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS send_sessions ( + session_id INTEGER PRIMARY KEY AUTOINCREMENT, + receiver_pubkey BLOB NOT NULL, + completed_at INTEGER + )", + [], + )?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS receive_sessions ( + session_id INTEGER PRIMARY KEY AUTOINCREMENT, + completed_at INTEGER + )", + [], + )?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS send_session_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL, + event_data TEXT NOT NULL, + created_at INTEGER NOT NULL, + FOREIGN KEY(session_id) REFERENCES send_sessions(session_id) + )", + [], + )?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS receive_session_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL, + event_data TEXT NOT NULL, + created_at INTEGER NOT NULL, + FOREIGN KEY(session_id) REFERENCES receive_sessions(session_id) + )", + [], + )?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS inputs_seen ( + outpoint BLOB PRIMARY KEY, + created_at INTEGER NOT NULL + )", + [], + )?; + + Ok(()) + } + + /// Inserts the input and returns true if the input was seen before, false otherwise. + /// This is used for replay protection to prevent probing attacks. + pub fn insert_input_seen_before(&self, input: OutPoint) -> Result { + let key = serialize(&input); + let was_seen_before = self.conn().execute( + "INSERT OR IGNORE INTO inputs_seen (outpoint, created_at) VALUES (?1, ?2)", + params![key, now()], + )? == 0; + Ok(was_seen_before) + } + + /// Returns IDs of all active (incomplete) receive sessions + pub fn get_recv_session_ids(&self) -> Result> { + let conn = self.conn(); + let mut stmt = + conn.prepare("SELECT session_id FROM receive_sessions WHERE completed_at IS NULL ORDER BY session_id DESC")?; + + let session_rows = stmt.query_map([], |row| { + let session_id: i64 = row.get(0)?; + Ok(SessionId(session_id)) + })?; + + let mut session_ids = Vec::new(); + for session_row in session_rows { + session_ids.push(session_row?); + } + + Ok(session_ids) + } + + /// Returns IDs of all active (incomplete) send sessions + pub fn get_send_session_ids(&self) -> Result> { + let conn = self.conn(); + let mut stmt = + conn.prepare("SELECT session_id FROM send_sessions WHERE completed_at IS NULL ORDER BY session_id DESC")?; + + let session_rows = stmt.query_map([], |row| { + let session_id: i64 = row.get(0)?; + Ok(SessionId(session_id)) + })?; + + let mut session_ids = Vec::new(); + for session_row in session_rows { + session_ids.push(session_row?); + } + + Ok(session_ids) + } + + /// Returns the receiver public key for a send session + pub fn get_send_session_receiver_pk(&self, session_id: &SessionId) -> Result { + let conn = self.conn(); + let mut stmt = + conn.prepare("SELECT receiver_pubkey FROM send_sessions WHERE session_id = ?1")?; + let receiver_pubkey: Vec = stmt.query_row(params![session_id.0], |row| row.get(0))?; + Ok(HpkePublicKey::from_compressed_bytes(&receiver_pubkey).expect("Valid receiver pubkey")) + } + + /// Returns IDs and completion timestamps of all completed send sessions + pub fn get_inactive_send_session_ids(&self) -> Result> { + let conn = self.conn(); + let mut stmt = conn.prepare( + "SELECT session_id, completed_at FROM send_sessions WHERE completed_at IS NOT NULL", + )?; + let session_rows = stmt.query_map([], |row| { + let session_id: i64 = row.get(0)?; + let completed_at: u64 = row.get(1)?; + Ok((SessionId(session_id), completed_at)) + })?; + + let mut session_ids = Vec::new(); + for session_row in session_rows { + session_ids.push(session_row?); + } + Ok(session_ids) + } + + /// Returns IDs and completion timestamps of all completed receive sessions + pub fn get_inactive_recv_session_ids(&self) -> Result> { + let conn = self.conn(); + let mut stmt = conn.prepare( + "SELECT session_id, completed_at FROM receive_sessions WHERE completed_at IS NOT NULL", + )?; + let session_rows = stmt.query_map([], |row| { + let session_id: i64 = row.get(0)?; + let completed_at: u64 = row.get(1)?; + Ok((SessionId(session_id), completed_at)) + })?; + + let mut session_ids = Vec::new(); + for session_row in session_rows { + session_ids.push(session_row?); + } + Ok(session_ids) + } + + /// Formats a Unix timestamp into local date time text. + pub fn format_unix_timestamp(&self, timestamp: u64) -> Result { + let Ok(timestamp) = i64::try_from(timestamp) else { + return Ok(format!("Invalid timestamp ({timestamp})")); + }; + let conn = self.conn(); + let dt: Option = conn.query_row( + "SELECT datetime(?1, 'unixepoch', 'localtime')", + params![timestamp], + |row| row.get(0), + )?; + Ok(dt.unwrap_or_else(|| format!("Invalid timestamp ({timestamp})"))) + } +} + +/// Wrapper type for session IDs +#[derive(Debug, Clone)] +pub struct SessionId(i64); + +impl core::ops::Deref for SessionId { + type Target = i64; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl fmt::Display for SessionId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl SessionId { + pub fn as_i64(&self) -> i64 { + self.0 + } +} + +/// Persister for payjoin v2 send sessions +#[derive(Clone)] +pub struct SenderPersister { + db: Arc, + session_id: SessionId, +} + +impl SenderPersister { + /// Creates a new sender persister, creating a new session in the database + pub fn new(db: Arc, receiver_pubkey: HpkePublicKey) -> Result { + let session_id: i64 = db.conn().query_row( + "INSERT INTO send_sessions (session_id, receiver_pubkey) VALUES (NULL, ?1) RETURNING session_id", + params![receiver_pubkey.to_compressed_bytes()], + |row| row.get(0), + )?; + + Ok(Self { + db, + session_id: SessionId(session_id), + }) + } + + /// Creates a persister from an existing session ID + pub fn from_id(db: Arc, id: SessionId) -> Self { + Self { db, session_id: id } + } +} + +impl SessionPersister for SenderPersister { + type SessionEvent = SenderSessionEvent; + type InternalStorageError = Error; + + fn save_event( + &self, + event: SenderSessionEvent, + ) -> std::result::Result<(), Self::InternalStorageError> { + let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?; + + self.db.conn().execute( + "INSERT INTO send_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)", + params![*self.session_id, event_data, now()], + )?; + + Ok(()) + } + + fn load( + &self, + ) -> std::result::Result>, Self::InternalStorageError> + { + let conn = self.db.conn(); + let mut stmt = conn.prepare( + "SELECT event_data FROM send_session_events WHERE session_id = ?1 ORDER BY id ASC", + )?; + + let event_rows = stmt.query_map(params![*self.session_id], |row| { + let event_data: String = row.get(0)?; + Ok(event_data) + })?; + + let events: Vec = event_rows + .map(|row| { + let event_data = row.expect("Failed to read event data from database"); + serde_json::from_str::(&event_data) + .expect("Database corruption: failed to deserialize session event") + }) + .collect(); + + Ok(Box::new(events.into_iter())) + } + + fn close(&self) -> std::result::Result<(), Self::InternalStorageError> { + self.db.conn().execute( + "UPDATE send_sessions SET completed_at = ?1 WHERE session_id = ?2", + params![now(), *self.session_id], + )?; + + Ok(()) + } +} + +/// Persister for payjoin v2 receive sessions +#[derive(Clone)] +pub struct ReceiverPersister { + db: Arc, + session_id: SessionId, +} + +impl ReceiverPersister { + /// Creates a new receiver persister, creating a new session in the database + pub fn new(db: Arc) -> Result { + let session_id: i64 = db.conn().query_row( + "INSERT INTO receive_sessions (session_id) VALUES (NULL) RETURNING session_id", + [], + |row| row.get(0), + )?; + + Ok(Self { + db, + session_id: SessionId(session_id), + }) + } + + /// Creates a persister from an existing session ID + pub fn from_id(db: Arc, id: SessionId) -> Self { + Self { db, session_id: id } + } +} + +impl SessionPersister for ReceiverPersister { + type SessionEvent = ReceiverSessionEvent; + type InternalStorageError = Error; + + fn save_event( + &self, + event: ReceiverSessionEvent, + ) -> std::result::Result<(), Self::InternalStorageError> { + let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?; + + self.db.conn().execute( + "INSERT INTO receive_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)", + params![*self.session_id, event_data, now()], + )?; + + Ok(()) + } + + fn load( + &self, + ) -> std::result::Result< + Box>, + Self::InternalStorageError, + > { + let conn = self.db.conn(); + let mut stmt = conn.prepare( + "SELECT event_data FROM receive_session_events WHERE session_id = ?1 ORDER BY id ASC", + )?; + + let event_rows = stmt.query_map(params![*self.session_id], |row| { + let event_data: String = row.get(0)?; + Ok(event_data) + })?; + + let events: Vec = event_rows + .map(|row| { + let event_data = row.expect("Failed to read event data from database"); + serde_json::from_str::(&event_data) + .expect("Database corruption: failed to deserialize session event") + }) + .collect(); + + Ok(Box::new(events.into_iter())) + } + + fn close(&self) -> std::result::Result<(), Self::InternalStorageError> { + self.db.conn().execute( + "UPDATE receive_sessions SET completed_at = ?1 WHERE session_id = ?2", + params![now(), *self.session_id], + )?; + + Ok(()) + } +} From 711fa1eb332b243c8d46a7983302b9e20a145fc1 Mon Sep 17 00:00:00 2001 From: Mshehu5 Date: Mon, 16 Feb 2026 14:48:33 +0100 Subject: [PATCH 2/4] feat: integrate session persistence into payjoin workflow - Add database initialization in handlers - Replace NoopSessionPersister with real persisters - Implement session resumption for existing sessions - Add input-seen-before tracking in receiver flow - Remove verbose error wrapping (use ? operator) --- src/handlers.rs | 53 +++++++++-- src/payjoin/mod.rs | 216 +++++++++++++++++++++++++++++---------------- 2 files changed, 185 insertions(+), 84 deletions(-) diff --git a/src/handlers.rs b/src/handlers.rs index 2138e4c..2c614b7 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -594,6 +594,16 @@ pub fn handle_offline_wallet_subcommand( } } +#[cfg(feature = "payjoin")] +pub fn open_payjoin_db( + datadir: Option, +) -> Result, Error> { + use crate::payjoin::db::{DB_FILENAME, Database}; + let home_dir = prepare_home_dir(datadir)?; + let db_path = home_dir.join(DB_FILENAME); + Ok(std::sync::Arc::new(Database::create(&db_path)?)) +} + /// Execute an online wallet sub-command /// /// Online wallet sub-commands are described in [`OnlineWalletSubCommand`]. @@ -607,6 +617,7 @@ pub(crate) async fn handle_online_wallet_subcommand( wallet: &mut Wallet, client: &BlockchainClient, online_subcommand: OnlineWalletSubCommand, + datadir: Option, ) -> Result { match online_subcommand { FullScan { @@ -724,7 +735,8 @@ pub(crate) async fn handle_online_wallet_subcommand( max_fee_rate, } => { let relay_manager = Arc::new(Mutex::new(RelayManager::new())); - let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager); + let db = open_payjoin_db(datadir.clone())?; + let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager, db); return payjoin_manager .receive_payjoin(amount, directory, max_fee_rate, ohttp_relay, client) .await; @@ -735,11 +747,25 @@ pub(crate) async fn handle_online_wallet_subcommand( fee_rate, } => { let relay_manager = Arc::new(Mutex::new(RelayManager::new())); - let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager); + let db = open_payjoin_db(datadir.clone())?; + let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager, db); return payjoin_manager .send_payjoin(uri, fee_rate, ohttp_relay, client) .await; } + ResumePayjoin { + directory, + ohttp_relay, + session_id, + } => { + let relay_manager = Arc::new(Mutex::new(RelayManager::new())); + let db = open_payjoin_db(datadir)?; + let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager, db); + return payjoin_manager + .resume_payjoins(directory, ohttp_relay, session_id, client) + .await; + } + PayjoinHistory => crate::payjoin::PayjoinManager::history(datadir), } } @@ -1210,7 +1236,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { wallet, subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand), } => { - let home_dir = prepare_home_dir(cli_opts.datadir)?; + let home_dir = prepare_home_dir(cli_opts.datadir.clone())?; let (wallet_opts, network) = load_wallet_config(&home_dir, &wallet)?; @@ -1249,6 +1275,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { &mut wallet, &blockchain_client, online_subcommand, + cli_opts.datadir.clone(), ) .await?; wallet.persist(&mut persister)?; @@ -1259,8 +1286,13 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { let mut wallet = new_wallet(network, wallet_opts)?; let blockchain_client = crate::utils::new_blockchain_client(wallet_opts, &wallet, database_path)?; - handle_online_wallet_subcommand(&mut wallet, &blockchain_client, online_subcommand) - .await? + handle_online_wallet_subcommand( + &mut wallet, + &blockchain_client, + online_subcommand, + cli_opts.datadir.clone(), + ) + .await? }; Ok(result) } @@ -1453,9 +1485,14 @@ async fn respond( } => { let blockchain = new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?; - let value = handle_online_wallet_subcommand(wallet, &blockchain, online_subcommand) - .await - .map_err(|e| e.to_string())?; + let value = handle_online_wallet_subcommand( + wallet, + &blockchain, + online_subcommand, + cli_opts.datadir.clone(), + ) + .await + .map_err(|e| e.to_string())?; Some(value) } ReplSubCommand::Wallet { diff --git a/src/payjoin/mod.rs b/src/payjoin/mod.rs index 19beb7d..319b8d9 100644 --- a/src/payjoin/mod.rs +++ b/src/payjoin/mod.rs @@ -1,29 +1,34 @@ use crate::error::BDKCliError as Error; -use crate::handlers::{broadcast_transaction, sync_wallet}; +use crate::handlers::{broadcast_transaction, open_payjoin_db, sync_wallet}; use crate::utils::BlockchainClient; use bdk_wallet::{ SignOptions, Wallet, bitcoin::{FeeRate, Psbt, Txid, consensus::encode::serialize_hex}, }; +use cli_table::{Cell, CellStruct, Style, Table}; use payjoin::bitcoin::TxIn; use payjoin::persist::{OptionalTransitionOutcome, SessionPersister}; use payjoin::receive::InputPair; use payjoin::receive::v2::{ HasReplyableError, Initialized, MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown, PayjoinProposal, ProvisionalProposal, ReceiveSession, Receiver, - SessionEvent as ReceiverSessionEvent, UncheckedOriginalPayload, WantsFeeRange, WantsInputs, - WantsOutputs, + SessionEvent as ReceiverSessionEvent, SessionOutcome as ReceiverSessionOutcome, + UncheckedOriginalPayload, WantsFeeRange, WantsInputs, WantsOutputs, + replay_event_log as replay_receiver_event_log, }; use payjoin::send::v2::{ PollingForProposal, SendSession, Sender, SessionEvent as SenderSessionEvent, SessionOutcome as SenderSessionOutcome, WithReplyKey, + replay_event_log as replay_sender_event_log, }; -use payjoin::{ImplementationError, UriExt}; +use payjoin::{HpkePublicKey, ImplementationError, UriExt}; use serde_json::{json, to_string_pretty}; use std::sync::{Arc, Mutex}; +use crate::payjoin::db::{ReceiverPersister, SenderPersister}; use crate::payjoin::ohttp::{RelayManager, fetch_ohttp_keys}; +pub mod db; pub mod ohttp; /// Implements all of the functions required to go through the Payjoin receive and send processes. @@ -35,13 +40,20 @@ pub mod ohttp; pub(crate) struct PayjoinManager<'a> { wallet: &'a mut Wallet, relay_manager: Arc>, + db: Arc, +} } impl<'a> PayjoinManager<'a> { - pub fn new(wallet: &'a mut Wallet, relay_manager: Arc>) -> Self { + pub fn new( + wallet: &'a mut Wallet, + relay_manager: Arc>, + db: Arc, + ) -> Self { Self { wallet, relay_manager, + db, } } @@ -71,8 +83,8 @@ impl<'a> PayjoinManager<'a> { let ohttp_keys = fetch_ohttp_keys(ohttp_relays, &directory, self.relay_manager.clone()).await?; - // TODO: Implement proper persister. - let persister = payjoin::persist::NoopSessionPersister::::default(); + + let persister = crate::payjoin::db::ReceiverPersister::new(self.db.clone())?; let checked_max_fee_rate = max_fee_rate .map(FeeRate::from_sat_per_kwu) @@ -161,7 +173,7 @@ impl<'a> PayjoinManager<'a> { self.process_payjoin_proposal(psbt, blockchain_client) .await? } - payjoin::PjParam::V2(_) => { + payjoin::PjParam::V2(v2_param) => { let ohttp_relays: Vec = ohttp_relays .into_iter() .map(|s| url::Url::parse(&s)) @@ -176,9 +188,47 @@ impl<'a> PayjoinManager<'a> { )); } - // TODO: Implement proper persister. - let persister = - payjoin::persist::NoopSessionPersister::::default(); + use payjoin::send::v2::replay_event_log as replay_sender_event_log; + + // Check for existing session with the same receiver pubkey + let receiver_pubkey = v2_param.receiver_pubkey(); + let existing_session = self + .db + .get_send_session_ids() + .map_err(|e| Error::Generic(format!("{e}")))? + .into_iter() + .find_map(|session_id| { + let session_receiver_pubkey = self + .db + .get_send_session_receiver_pk(&session_id) + .expect("Receiver pubkey should exist if session id exists"); + if session_receiver_pubkey == *receiver_pubkey { + let sender_persister = + SenderPersister::from_id(self.db.clone(), session_id); + let (send_session, _) = replay_sender_event_log(&sender_persister) + .map_err(|e| { + Error::Generic(format!( + "Failed to replay sender event log: {:?}", + e + )) + }) + .ok()?; + Some((send_session, sender_persister)) + } else { + None + } + }); + + let (sender_state, persister) = if let Some((sender_state, persister)) = + existing_session + { + println!("Resuming existing sender session"); + (sender_state, persister) + } else { + let persister = { + let receiver_pubkey: HpkePublicKey = v2_param.receiver_pubkey().clone(); + SenderPersister::new(self.db.clone(), receiver_pubkey)? + }; let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri) .build_recommended(fee_rate)? @@ -189,15 +239,13 @@ impl<'a> PayjoinManager<'a> { )) })?; - let selected_relay = - fetch_ohttp_keys(ohttp_relays, &sender.endpoint(), self.relay_manager.clone()) - .await? - .relay_url; + (SendSession::WithReplyKey(sender), persister) + }; self.proceed_sender_session( - SendSession::WithReplyKey(sender), + sender_state, &persister, - selected_relay.to_string(), + ohttp_relays, blockchain_client, ) .await? @@ -360,13 +408,8 @@ impl<'a> PayjoinManager<'a> { blockchain_client: &BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver - .check_inputs_not_owned(&mut |input| { - Ok(self.wallet.is_mine(input.to_owned())) - }) - .save(persister) - .map_err(|e| { - Error::Generic(format!("Error occurred when saving after checking if inputs in the original proposal are not owned: {e}")) - })?; + .check_inputs_not_owned(&mut |input| Ok(self.wallet.is_mine(input.to_owned()))) + .save(persister)?; self.check_no_inputs_seen_before( next_receiver_typestate, @@ -384,16 +427,11 @@ impl<'a> PayjoinManager<'a> { max_fee_rate: FeeRate, blockchain_client: &BlockchainClient, ) -> Result<(), Error> { - // This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI - // yet. If there is support either in the BDK persister or Payjoin persister, this can be - // implemented, but it is not a concern as the use cases of the CLI does not warrant - // protection against probing attacks. - println!( - "Checking whether the inputs in the proposal were seen before to protect from probing attacks is not supported. Skipping the check..." - ); - let next_receiver_typestate = receiver.check_no_inputs_seen_before(&mut |_| Ok(false)).save(persister).map_err(|e| { - Error::Generic(format!("Error occurred when saving after checking if the inputs in the proposal were seen before: {e}")) - })?; + let db = self.db.clone(); + let next_receiver_typestate = receiver + .check_no_inputs_seen_before(&mut |input| Ok(db.insert_input_seen_before(*input)?)) + .save(persister)?; + self.identify_receiver_outputs( next_receiver_typestate, persister, @@ -410,11 +448,11 @@ impl<'a> PayjoinManager<'a> { max_fee_rate: FeeRate, blockchain_client: &BlockchainClient, ) -> Result<(), Error> { - let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| { + let next_receiver_typestate = receiver + .identify_receiver_outputs(&mut |output_script| { Ok(self.wallet.is_mine(output_script.to_owned())) - }).save(persister).map_err(|e| { - Error::Generic(format!("Error occurred when saving after checking if the outputs in the original proposal are owned by the receiver: {e}")) - })?; + }) + .save(persister)?; self.commit_outputs( next_receiver_typestate, @@ -498,9 +536,9 @@ impl<'a> PayjoinManager<'a> { max_fee_rate: FeeRate, blockchain_client: &BlockchainClient, ) -> Result<(), Error> { - let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| { - Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}")) - })?; + let next_receiver_typestate = receiver + .apply_fee_range(None, Some(max_fee_rate)) + .save(persister)?; self.finalize_proposal(next_receiver_typestate, persister, blockchain_client) .await } @@ -528,12 +566,7 @@ impl<'a> PayjoinManager<'a> { Ok(psbt_clone) }) - .save(persister) - .map_err(|e| { - Error::Generic(format!( - "Error occurred when saving after signing the Payjoin proposal: {e}" - )) - })?; + .save(persister)?; self.send_payjoin_proposal(next_receiver_typestate, persister, blockchain_client) .await @@ -545,22 +578,21 @@ impl<'a> PayjoinManager<'a> { persister: &impl SessionPersister, blockchain_client: &BlockchainClient, ) -> Result<(), Error> { - let (req, ctx) = receiver.create_post_request( - self.relay_manager - .lock() - .expect("Lock should not be poisoned") - .get_selected_relay() - .expect("A relay should already be selected") + let (req, ctx) = receiver + .create_post_request( + self.unwrap_relay_or_else_fetch(vec![], None::<&str>) + .await? .as_str(), - ).map_err(|e| { + ) + .map_err(|e| { Error::Generic(format!("Error occurred when creating a post request for sending final Payjoin proposal: {e}")) })?; let res = self.send_payjoin_post_request(req).await?; let payjoin_psbt = receiver.psbt().clone(); - let next_receiver_typestate = receiver.process_response(&res.bytes().await?, ctx).save(persister).map_err(|e| { - Error::Generic(format!("Error occurred when saving after processing the response to the Payjoin proposal send: {e}")) - })?; + let next_receiver_typestate = receiver + .process_response(&res.bytes().await?, ctx) + .save(persister)?; println!( "Response successful. TXID: {}", payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid() @@ -623,10 +655,7 @@ impl<'a> PayjoinManager<'a> { } } ) - .save(persister) - .map_err(|e| { - Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}")) - }); + .save(persister); if let Ok(OptionalTransitionOutcome::Progress(_)) = check_result { println!("Payjoin transaction detected in the mempool!"); @@ -659,11 +688,8 @@ impl<'a> PayjoinManager<'a> { ) -> Result<(), Error> { let (err_req, err_ctx) = receiver .create_error_request( - self.relay_manager - .lock() - .expect("Lock should not be poisoned") - .get_selected_relay() - .expect("A relay should already be selected") + self.unwrap_relay_or_else_fetch(vec![], None::<&str>) + .await? .as_str(), ) .map_err(|e| { @@ -710,16 +736,32 @@ impl<'a> PayjoinManager<'a> { &self, session: SendSession, persister: &impl SessionPersister, - relay: impl payjoin::IntoUrl, + ohttp_relays: Vec, blockchain_client: &BlockchainClient, ) -> Result { match session { SendSession::WithReplyKey(context) => { - self.post_original_proposal(context, relay, persister, blockchain_client) + let relay = self + .unwrap_relay_or_else_fetch(ohttp_relays, Some(context.endpoint())) + .await?; + self.post_original_proposal( + context, + persister, + blockchain_client, + relay.to_string(), + ) .await } SendSession::PollingForProposal(context) => { - self.get_proposed_payjoin_proposal(context, relay, persister, blockchain_client) + let relay = self + .unwrap_relay_or_else_fetch(ohttp_relays, Some(context.endpoint())) + .await?; + self.get_proposed_payjoin_proposal( + context, + persister, + blockchain_client, + relay.to_string(), + ) .await } SendSession::Closed(SenderSessionOutcome::Success(psbt)) => { @@ -729,31 +771,53 @@ impl<'a> PayjoinManager<'a> { } } + async fn unwrap_relay_or_else_fetch( + &self, + ohttp_relays: Vec, + directory: Option, + ) -> Result { + let selected_relay = self + .relay_manager + .lock() + .expect("Lock should not be poisoned") + .get_selected_relay(); + match selected_relay { + Some(relay) => Ok(relay), + None => { + let directory = directory.ok_or_else(|| { + Error::Generic("No directory URL provided and no relay selected".to_string()) + })?; + Ok( + fetch_ohttp_keys(ohttp_relays, directory, self.relay_manager.clone()) + .await? + .relay_url, + ) + } + } + } + async fn post_original_proposal( &self, sender: Sender, - relay: impl payjoin::IntoUrl, persister: &impl SessionPersister, blockchain_client: &BlockchainClient, + relay: String, ) -> Result { let (req, ctx) = sender.create_v2_post_request(relay.as_str())?; let response = self.send_payjoin_post_request(req).await?; let sender = sender .process_response(&response.bytes().await?, ctx) - .save(persister) - .map_err(|e| { - Error::Generic(format!("Failed to persist the Payjoin send after successfully sending original proposal: {e}")) - })?; - self.get_proposed_payjoin_proposal(sender, relay, persister, blockchain_client) + .save(persister)?; + self.get_proposed_payjoin_proposal(sender, persister, blockchain_client, relay) .await } async fn get_proposed_payjoin_proposal( &self, sender: Sender, - relay: impl payjoin::IntoUrl, persister: &impl SessionPersister, blockchain_client: &BlockchainClient, + relay: String, ) -> Result { let mut sender = sender.clone(); loop { From a676d69a426edc2e0bc580d208530982c176c4cd Mon Sep 17 00:00:00 2001 From: Mshehu5 Date: Mon, 16 Feb 2026 14:50:06 +0100 Subject: [PATCH 3/4] feat: add resume and history commands for payjoin sessions - Implement resume_payjoins() to continue pending sessions - Add history() to display all session states - Add session status text helpers for UI display - Support filtering by session ID --- src/commands.rs | 14 ++ src/payjoin/mod.rs | 323 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 325 insertions(+), 12 deletions(-) diff --git a/src/commands.rs b/src/commands.rs index 14ad9ea..4cebcf0 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -479,6 +479,20 @@ pub enum OnlineWalletSubCommand { )] fee_rate: u64, }, + /// Resume pending payjoin sessions. + ResumePayjoin { + /// Payjoin directory for the session + #[arg(env = "PAYJOIN_DIRECTORY", long = "directory", required = true)] + directory: String, + /// URL of the Payjoin OHTTP relay. Can be repeated multiple times. + #[arg(env = "PAYJOIN_OHTTP_RELAY", long = "ohttp_relay", required = true)] + ohttp_relay: Vec, + /// Resume only a specific active session ID (sender and/or receiver). + #[arg(env = "PAYJOIN_SESSION_ID", long = "session_id")] + session_id: Option, + }, + /// Show payjoin session history. + PayjoinHistory, } /// Subcommands for Key operations. diff --git a/src/payjoin/mod.rs b/src/payjoin/mod.rs index 319b8d9..7ea7269 100644 --- a/src/payjoin/mod.rs +++ b/src/payjoin/mod.rs @@ -42,6 +42,60 @@ pub(crate) struct PayjoinManager<'a> { relay_manager: Arc>, db: Arc, } + +trait StatusText { + fn status_text(&self) -> &'static str; +} + +impl StatusText for SendSession { + fn status_text(&self) -> &'static str { + match self { + SendSession::WithReplyKey(_) | SendSession::PollingForProposal(_) => { + "Waiting for proposal" + } + SendSession::Closed(session_outcome) => match session_outcome { + SenderSessionOutcome::Failure => "Session failure", + SenderSessionOutcome::Success(_) => "Session success", + SenderSessionOutcome::Cancel => "Session cancelled", + }, + } + } +} + +impl StatusText for ReceiveSession { + fn status_text(&self) -> &'static str { + match self { + ReceiveSession::Initialized(_) => "Waiting for original proposal", + ReceiveSession::UncheckedOriginalPayload(_) + | ReceiveSession::MaybeInputsOwned(_) + | ReceiveSession::MaybeInputsSeen(_) + | ReceiveSession::OutputsUnknown(_) + | ReceiveSession::WantsOutputs(_) + | ReceiveSession::WantsInputs(_) + | ReceiveSession::WantsFeeRange(_) + | ReceiveSession::ProvisionalProposal(_) => "Processing original proposal", + ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent", + ReceiveSession::HasReplyableError(_) => { + "Session failure, waiting to post error response" + } + ReceiveSession::Monitor(_) => "Monitoring payjoin proposal", + ReceiveSession::Closed(session_outcome) => match session_outcome { + ReceiverSessionOutcome::Failure => "Session failure", + ReceiverSessionOutcome::Success(_) => { + "Session success, Payjoin proposal was broadcasted" + } + ReceiverSessionOutcome::Cancel => "Session cancelled", + ReceiverSessionOutcome::FallbackBroadcasted => "Fallback broadcasted", + }, + } + } +} + +struct SessionHistoryRow { + id: String, + role: &'static str, + status: String, + completed_at: Option, } impl<'a> PayjoinManager<'a> { @@ -230,14 +284,14 @@ impl<'a> PayjoinManager<'a> { SenderPersister::new(self.db.clone(), receiver_pubkey)? }; - let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri) - .build_recommended(fee_rate)? - .save(&persister) - .map_err(|e| { - Error::Generic(format!( - "Failed to save the Payjoin v2 sender in the persister: {e}" - )) - })?; + let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri) + .build_recommended(fee_rate)? + .save(&persister) + .map_err(|e| { + Error::Generic(format!( + "Failed to save the Payjoin v2 sender in the persister: {e}" + )) + })?; (SendSession::WithReplyKey(sender), persister) }; @@ -450,7 +504,7 @@ impl<'a> PayjoinManager<'a> { ) -> Result<(), Error> { let next_receiver_typestate = receiver .identify_receiver_outputs(&mut |output_script| { - Ok(self.wallet.is_mine(output_script.to_owned())) + Ok(self.wallet.is_mine(output_script.to_owned())) }) .save(persister)?; @@ -582,7 +636,7 @@ impl<'a> PayjoinManager<'a> { .create_post_request( self.unwrap_relay_or_else_fetch(vec![], None::<&str>) .await? - .as_str(), + .as_str(), ) .map_err(|e| { Error::Generic(format!("Error occurred when creating a post request for sending final Payjoin proposal: {e}")) @@ -750,7 +804,7 @@ impl<'a> PayjoinManager<'a> { blockchain_client, relay.to_string(), ) - .await + .await } SendSession::PollingForProposal(context) => { let relay = self @@ -762,7 +816,7 @@ impl<'a> PayjoinManager<'a> { blockchain_client, relay.to_string(), ) - .await + .await } SendSession::Closed(SenderSessionOutcome::Success(psbt)) => { self.process_payjoin_proposal(psbt, blockchain_client).await @@ -871,4 +925,249 @@ impl<'a> PayjoinManager<'a> { .send() .await } + + /// Resume pending payjoin sessions from the database + pub async fn resume_payjoins( + &mut self, + directory: String, + ohttp_relays: Vec, + session_id: Option, + blockchain_client: &BlockchainClient, + ) -> Result { + let db = self.db.clone(); + let mut recv_session_ids = db.get_recv_session_ids()?; + let mut send_session_ids = db.get_send_session_ids()?; + + if let Some(session_id) = session_id { + recv_session_ids.retain(|id| id.as_i64() == session_id); + send_session_ids.retain(|id| id.as_i64() == session_id); + + if recv_session_ids.is_empty() && send_session_ids.is_empty() { + return Ok(serde_json::to_string_pretty(&json!({ + "message": format!("No active session found for session_id {}.", session_id) + }))?); + } + } + + if recv_session_ids.is_empty() && send_session_ids.is_empty() { + return Ok(serde_json::to_string_pretty(&json!({ + "message": "No sessions to resume." + }))?); + } + + let ohttp_relays: Vec = ohttp_relays + .into_iter() + .map(|s| url::Url::parse(&s)) + .collect::>() + .map_err(|e| Error::Generic(format!("Failed to parse OHTTP URLs: {e}")))?; + + let relay = self + .unwrap_relay_or_else_fetch(ohttp_relays, Some(&directory)) + .await?; + + let max_fee_rate = FeeRate::BROADCAST_MIN; + let total_sessions = recv_session_ids.len() + send_session_ids.len(); + let mut completed = 0usize; + let mut timed_out = 0usize; + let mut failed = 0usize; + + println!("Resuming {} payjoin session(s)...\n", total_sessions); + + // Resume receiver sessions + for session_id in recv_session_ids { + let persister = ReceiverPersister::from_id(db.clone(), session_id.clone()); + match replay_receiver_event_log(&persister) { + Ok((receiver_state, _)) => { + println!("Resuming receiver session {}", session_id); + match tokio::time::timeout( + std::time::Duration::from_secs(30), + self.proceed_receiver_session( + receiver_state, + &persister, + relay.as_str(), + max_fee_rate, + blockchain_client, + ), + ) + .await + { + Ok(Ok(_)) => { + completed += 1; + } + Ok(Err(e)) => { + failed += 1; + println!("Receiver session {} failed: {}", session_id, e); + } + Err(_) => { + timed_out += 1; + println!("Receiver session {} timed out", session_id); + } + } + } + Err(e) => { + failed += 1; + println!("Failed to replay receiver session {}: {:?}", session_id, e); + } + } + } + + // Resume sender sessions + for session_id in send_session_ids { + let persister = SenderPersister::from_id(db.clone(), session_id.clone()); + match replay_sender_event_log(&persister) { + Ok((sender_state, _)) => { + println!("Resuming sender session {}", session_id); + match tokio::time::timeout( + std::time::Duration::from_secs(30), + self.proceed_sender_session( + sender_state, + &persister, + vec![relay.clone()], + blockchain_client, + ), + ) + .await + { + Ok(Ok(_)) => { + completed += 1; + } + Ok(Err(e)) => { + failed += 1; + println!("Sender session {} failed: {}", session_id, e); + } + Err(_) => { + timed_out += 1; + println!("Sender session {} timed out", session_id); + } + } + } + Err(e) => { + failed += 1; + println!("Failed to replay sender session {}: {:?}", session_id, e); + } + } + } + + Ok(serde_json::to_string_pretty(&json!({ + "message": format!("Resumed polling for {} session(s).", total_sessions), + "outcome": format!( + "Completed: {}, timed out: {}, failed: {}.", + completed, timed_out, failed + ) + }))?) + } + + /// Show payjoin session history + pub fn history(datadir: Option) -> Result { + let db = open_payjoin_db(datadir)?; + let mut send_rows: Vec = Vec::new(); + let mut recv_rows: Vec = Vec::new(); + + // Active send sessions + for session_id in db + .get_send_session_ids() + .map_err(|e| Error::Generic(format!("{e}")))? + { + let persister = SenderPersister::from_id(db.clone(), session_id.clone()); + let status = match replay_sender_event_log(&persister) { + Ok((state, _)) => state.status_text().to_string(), + Err(e) => e.to_string(), + }; + send_rows.push(SessionHistoryRow { + id: session_id.to_string(), + role: "Sender", + status, + completed_at: None, + }); + } + + // Active receive sessions + for session_id in db + .get_recv_session_ids() + .map_err(|e| Error::Generic(format!("{e}")))? + { + let persister = ReceiverPersister::from_id(db.clone(), session_id.clone()); + let status = match replay_receiver_event_log(&persister) { + Ok((state, _)) => state.status_text().to_string(), + Err(e) => e.to_string(), + }; + recv_rows.push(SessionHistoryRow { + id: session_id.to_string(), + role: "Receiver", + status, + completed_at: None, + }); + } + + // Completed send sessions + for (session_id, completed_at) in db + .get_inactive_send_session_ids() + .map_err(|e| Error::Generic(format!("{e}")))? + { + let persister = SenderPersister::from_id(db.clone(), session_id.clone()); + let status = match replay_sender_event_log(&persister) { + Ok((state, _)) => state.status_text().to_string(), + Err(e) => e.to_string(), + }; + let completed_at = db + .format_unix_timestamp(completed_at) + .map_err(|e| Error::Generic(format!("{e}")))?; + send_rows.push(SessionHistoryRow { + id: session_id.to_string(), + role: "Sender", + status, + completed_at: Some(completed_at), + }); + } + + // Completed receive sessions + for (session_id, completed_at) in db + .get_inactive_recv_session_ids() + .map_err(|e| Error::Generic(format!("{e}")))? + { + let persister = ReceiverPersister::from_id(db.clone(), session_id.clone()); + let status = match replay_receiver_event_log(&persister) { + Ok((state, _)) => state.status_text().to_string(), + Err(e) => e.to_string(), + }; + let completed_at = db + .format_unix_timestamp(completed_at) + .map_err(|e| Error::Generic(format!("{e}")))?; + recv_rows.push(SessionHistoryRow { + id: session_id.to_string(), + role: "Receiver", + status, + completed_at: Some(completed_at), + }); + } + + let rows: Vec> = send_rows + .iter() + .chain(recv_rows.iter()) + .map(|row| { + vec![ + row.id.as_str().cell(), + row.role.cell(), + row.completed_at + .clone() + .unwrap_or_else(|| "Not Completed".to_string()) + .cell(), + row.status.as_str().cell(), + ] + }) + .collect(); + + let table = rows + .table() + .title(vec![ + "Session ID".cell().bold(true), + "Sender/Receiver".cell().bold(true), + "Completed At".cell().bold(true), + "Status".cell().bold(true), + ]) + .display() + .map_err(|e| Error::Generic(e.to_string()))?; + + Ok(format!("{table}")) + } } From c79bd952f7fd6aa368791a348a83cfefbffdeb6f Mon Sep 17 00:00:00 2001 From: Mshehu5 Date: Mon, 16 Feb 2026 14:51:18 +0100 Subject: [PATCH 4/4] docs: update README with new payjoin session commands - Document resume and history commands - Add sqlite dependecy for payjoin --- Cargo.toml | 2 +- README.md | 27 ++++++++++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b7d610a..ee60e6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ esplora = ["bdk_esplora", "_payjoin-dependencies"] rpc = ["bdk_bitcoind_rpc", "_payjoin-dependencies"] # Internal features -_payjoin-dependencies = ["payjoin", "reqwest", "url"] +_payjoin-dependencies = ["payjoin", "reqwest", "url", "sqlite"] # Use this to consensus verify transactions at sync time verify = [] diff --git a/README.md b/README.md index d905197..8a18ea4 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ And yes, it can do Taproot!! This crate can be used for the following purposes: - Instantly create a miniscript based wallet and connect to your backend of choice (Electrum, Esplora, Core RPC, Kyoto etc) and quickly play around with your own complex bitcoin scripting workflow. With one or many wallets, connected with one or many backends. - The `tests/integration.rs` module is used to document high level complex workflows between BDK and different Bitcoin infrastructure systems, like Core, Electrum and Lightning(soon TM). - - Receive and send Async Payjoins. Note that even though Async Payjoin as a protocol allows the receiver and sender to go offline during the payjoin, the BDK CLI implementation currently does not support persisting. + - Receive and send Async Payjoins with session persistence. Sessions can be resumed if interrupted. - (Planned) Expose the basic command handler via `wasm` to integrate `bdk-cli` functionality natively into the web platform. See also the [playground](https://bitcoindevkit.org/bdk-cli/playground/) page. If you are considering using BDK in your own wallet project bdk-cli is a nice playground to get started with. It allows easy testnet and regtest wallet operations, to try out what's possible with descriptors, miniscript, and BDK APIs. For more information on BDK refer to the [website](https://bitcoindevkit.org/) and the [rust docs](https://docs.rs/bdk_wallet/1.0.0/bdk_wallet/index.html) @@ -140,6 +140,31 @@ cargo run --features rpc -- wallet --wallet payjoin_wallet2 balance cargo run --features rpc -- wallet --wallet payjoin_wallet2 send_payjoin --ohttp_relay "https://pj.bobspacebkk.com" --ohttp_relay "https://pj.benalleng.com" --fee_rate 1 --uri "" ``` +### Payjoin Session Persistence + +Payjoin sessions are automatically persisted to a SQLite database (`payjoin.sqlite`) in the data directory. This allows sessions to be resumed if interrupted. + +#### Resume Payjoin Sessions + +Resume all pending sessions: +``` +cargo run --features rpc -- wallet --wallet resume_payjoin --directory "https://payjo.in" --ohttp_relay "https://pj.bobspacebkk.com" +``` + +Resume a specific session by ID: +``` +cargo run --features rpc -- wallet --wallet resume_payjoin --directory "https://payjo.in" --ohttp_relay "https://pj.bobspacebkk.com" --session_id +``` + +Sessions are processed sequentially (not concurrently) due to BDK-CLI's architecture. Each session waits up to 30 seconds for updates before timing out. If no session ID is specified, the most recent active sessions are resumed first. + +#### View Session History + +View all payjoin sessions (active and completed) and also see their status: +``` +cargo run -- wallet --wallet payjoin_history +``` + ## Justfile We have added the `just` command runner to help you with common commands (during development) and running regtest `bitcoind` if you are using the `rpc` feature.