@@ -2,7 +2,7 @@ use crate::{
22 error:: Error , metrics:: update_bitcoin_metrics, storage:: TransactionStore , system:: VaultData , VaultIdManager ,
33} ;
44use bitcoin:: {
5- BitcoinCoreApi , LockedTransaction , PartialAddress , TransactionExt , TransactionMetadata ,
5+ BitcoinCoreApi , LockedTransaction , PartialAddress , Transaction , TransactionExt , TransactionMetadata , Txid ,
66 BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL ,
77} ;
88use futures:: {
@@ -172,6 +172,57 @@ impl Request {
172172 }
173173 }
174174
175+ async fn wait_and_execute <
176+ B : BitcoinCoreApi + Clone + Send + Sync + ' static ,
177+ P : ReplacePallet
178+ + RefundPallet
179+ + BtcRelayPallet
180+ + RedeemPallet
181+ + SecurityPallet
182+ + VaultRegistryPallet
183+ + UtilFuncs
184+ + Clone
185+ + Send
186+ + Sync ,
187+ > (
188+ & self ,
189+ parachain_rpc : P ,
190+ btc_rpc : B ,
191+ txid : Txid ,
192+ num_confirmations : u32 ,
193+ ) {
194+ // Payment has been made, but it might not have been confirmed enough times yet
195+ let tx_metadata = btc_rpc. wait_for_transaction_metadata ( txid, num_confirmations) . await ;
196+
197+ match tx_metadata {
198+ Ok ( tx_metadata) => {
199+ // we have enough btc confirmations, now make sure they have been relayed before we continue
200+ if let Err ( e) = parachain_rpc
201+ . wait_for_block_in_relay (
202+ H256Le :: from_bytes_le ( & tx_metadata. block_hash . to_vec ( ) ) ,
203+ Some ( num_confirmations) ,
204+ )
205+ . await
206+ {
207+ tracing:: error!(
208+ "Error while waiting for block inclusion for request #{}: {}" ,
209+ self . hash,
210+ e
211+ ) ;
212+ // continue; try to execute anyway
213+ }
214+
215+ match self . execute ( parachain_rpc. clone ( ) , tx_metadata) . await {
216+ Ok ( _) => {
217+ tracing:: info!( "Executed request #{:?}" , self . hash) ;
218+ }
219+ Err ( e) => tracing:: error!( "Failed to execute request #{}: {}" , self . hash, e) ,
220+ }
221+ }
222+ Err ( e) => tracing:: error!( "Failed to confirm bitcoin transaction for request {}: {}" , self . hash, e) ,
223+ }
224+ }
225+
175226 /// Makes the bitcoin transfer and executes the request
176227 pub async fn pay_and_execute <
177228 B : BitcoinCoreApi + Clone + Send + Sync + ' static ,
@@ -355,6 +406,7 @@ pub async fn execute_open_requests<B, TS>(
355406 shutdown_tx : ShutdownSender ,
356407 parachain_rpc : InterBtcParachain ,
357408 vault_id_manager : VaultIdManager < B > ,
409+ read_only_btc_rpc : B ,
358410 tx_store : Arc < TS > ,
359411 num_confirmations : u32 ,
360412 payment_margin : Duration ,
@@ -413,6 +465,7 @@ where
413465 . map ( |x| ( x. hash , x) )
414466 . collect :: < HashMap < _ , _ > > ( ) ;
415467
468+ // 1. check tx store for request txs
416469 for ( hash, request) in open_requests. clone ( ) . into_iter ( ) {
417470 // get the request this transaction corresponds to, if any
418471 if let Ok ( tx) = tx_store. get_tx ( & hash) {
@@ -448,48 +501,68 @@ where
448501 // try sending but ignore the result as it may have already been processed
449502 let _ = btc_rpc. send_transaction ( locked_tx) . await ;
450503
451- // Payment has been made, but it might not have been confirmed enough times yet
452- let tx_metadata = btc_rpc
453- . clone ( )
454- . wait_for_transaction_metadata ( txid, num_confirmations)
504+ request
505+ . wait_and_execute ( parachain_rpc, btc_rpc, txid, num_confirmations)
455506 . await ;
507+ } ) ;
508+ }
509+ }
510+
511+ // find the height of bitcoin chain corresponding to the earliest btc_height
512+ let btc_start_height = match open_requests
513+ . iter ( )
514+ . map ( |( _, request) | request. btc_height . unwrap_or ( u32:: MAX ) )
515+ . min ( )
516+ {
517+ Some ( x) => x,
518+ None => return Ok ( ( ) ) , // the iterator is empty so we have nothing to do
519+ } ;
456520
457- match tx_metadata {
458- Ok ( tx_metadata) => {
459- // we have enough btc confirmations, now make sure they have been relayed before we continue
460- if let Err ( e) = parachain_rpc
461- . wait_for_block_in_relay (
462- H256Le :: from_bytes_le ( & tx_metadata. block_hash . to_vec ( ) ) ,
463- Some ( num_confirmations) ,
464- )
465- . await
466- {
467- tracing:: error!(
468- "Error while waiting for block inclusion for request #{}: {}" ,
469- request. hash,
470- e
471- ) ;
472- // continue; try to execute anyway
473- }
474-
475- match request. execute ( parachain_rpc. clone ( ) , tx_metadata) . await {
476- Ok ( _) => {
477- tracing:: info!( "Executed request #{:?}" , request. hash) ;
478- }
479- Err ( e) => tracing:: error!( "Failed to execute request #{}: {}" , request. hash, e) ,
480- }
521+ // 2. fallback to mempool / blocks to find payments (for backward compatibility)
522+ // iterate through transactions in reverse order, starting from those in the mempool
523+ let mut transaction_stream = bitcoin:: reverse_stream_transactions ( & read_only_btc_rpc, btc_start_height) . await ?;
524+ while let Some ( result) = transaction_stream. next ( ) . await {
525+ let tx = match result {
526+ Ok ( x) => x,
527+ Err ( e) => {
528+ tracing:: warn!( "Failed to process transaction: {}" , e) ;
529+ continue ;
530+ }
531+ } ;
532+
533+ // get the request this transaction corresponds to, if any
534+ if let Some ( request) = get_request_for_btc_tx ( & tx, & open_requests) {
535+ open_requests. remove ( & request. hash ) ;
536+
537+ tracing:: info!(
538+ "{:?} request #{:?} has valid bitcoin payment - processing..." ,
539+ request. request_type,
540+ request. hash
541+ ) ;
542+ // start a new task to (potentially) await confirmation and to execute on the parachain
543+ // make copies of the variables we move into the task
544+ let parachain_rpc = parachain_rpc. clone ( ) ;
545+ let btc_rpc = vault_id_manager. clone ( ) ;
546+ spawn_cancelable ( shutdown_tx. subscribe ( ) , async move {
547+ let btc_rpc = match btc_rpc. get_bitcoin_rpc ( & request. vault_id ) . await {
548+ Some ( x) => x,
549+ None => {
550+ tracing:: error!(
551+ "Failed to fetch bitcoin rpc for vault {}" ,
552+ request. vault_id. pretty_print( )
553+ ) ;
554+ return ; // nothing we can do - bail
481555 }
482- Err ( e) => tracing:: error!(
483- "Failed to confirm bitcoin transaction for request {}: {}" ,
484- request. hash,
485- e
486- ) ,
487- }
556+ } ;
557+
558+ request
559+ . wait_and_execute ( parachain_rpc, btc_rpc, tx. txid ( ) , num_confirmations)
560+ . await ;
488561 } ) ;
489562 }
490563 }
491564
492- // All requests remaining in the hashmap did not have a bitcoin payment yet, so pay
565+ // All requests remaining in the hashmap do not have a bitcoin payment yet, so pay
493566 // and execute all of these
494567 for ( _, request) in open_requests {
495568 // there are potentially a large number of open requests - pay and execute each
@@ -539,6 +612,19 @@ where
539612 Ok ( ( ) )
540613}
541614
615+ /// Get the Request from the hashmap that the given Transaction satisfies, based
616+ /// on the OP_RETURN and the amount of btc that is transfered to the address
617+ fn get_request_for_btc_tx ( tx : & Transaction , hash_map : & HashMap < H256 , Request > ) -> Option < Request > {
618+ let hash = tx. get_op_return ( ) ?;
619+ let request = hash_map. get ( & hash) ?;
620+ let paid_amount = tx. get_payment_amount_to ( request. btc_address ) ?;
621+ if paid_amount as u128 >= request. amount {
622+ Some ( request. clone ( ) )
623+ } else {
624+ None
625+ }
626+ }
627+
542628#[ cfg( all( test, feature = "standalone-metadata" ) ) ]
543629mod tests {
544630 use crate :: metrics:: PerCurrencyMetrics ;
@@ -547,7 +633,7 @@ mod tests {
547633 use async_trait:: async_trait;
548634 use bitcoin:: {
549635 json, Amount , Block , BlockHash , BlockHeader , Error as BitcoinError , GetBlockResult , Network , PartialAddress ,
550- PrivateKey , Transaction , TransactionMetadata , Txid , PUBLIC_KEY_SIZE ,
636+ PrivateKey , Transaction , TransactionMetadata , PUBLIC_KEY_SIZE ,
551637 } ;
552638 use runtime:: {
553639 AccountId , BlockNumber , BtcPublicKey , CurrencyId , Error as RuntimeError , ErrorCode , InterBtcRichBlockHeader ,
0 commit comments