@@ -937,6 +937,358 @@ fn test_partial_claim_before_restart() {
937937 do_test_partial_claim_before_restart ( true , true ) ;
938938}
939939
940+ #[ test]
941+ fn test_mpp_claim_htlc_fulfills_unblocked_on_reload ( ) {
942+ let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
943+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
944+ let persister;
945+ let new_chain_monitor;
946+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
947+ let nodes_1_deserialized;
948+ let mut nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
949+
950+ // Open two independent channels between the same nodes. The payment below is large enough to
951+ // force the router to split it across both channels, which is what makes the MPP claim depend
952+ // on both ChannelMonitors durably learning the preimage.
953+ let chan_a = create_announced_chan_between_nodes_with_value ( & nodes, 0 , 1 , 100_000 , 0 ) ;
954+ let chan_b = create_announced_chan_between_nodes_with_value ( & nodes, 0 , 1 , 100_000 , 0 ) ;
955+ let chan_id_a = chan_a. 2 ;
956+ let chan_id_b = chan_b. 2 ;
957+ let scid_a = chan_a. 0 . contents . short_channel_id ;
958+ let scid_b = chan_b. 0 . contents . short_channel_id ;
959+
960+ // Send an MPP payment to nodes[1]. `send_along_route_with_secret` leaves the payment
961+ // claimable but unclaimed, so nodes[1] still has both inbound HTLCs live when we start
962+ // manipulating monitor persistence below.
963+ let amt_msat = 15_000_000 ;
964+ let ( route, payment_hash, payment_preimage, payment_secret) =
965+ get_route_and_payment_hash ! ( nodes[ 0 ] , nodes[ 1 ] , amt_msat) ;
966+ assert_eq ! ( route. paths. len( ) , 2 ) ;
967+ send_along_route_with_secret (
968+ & nodes[ 0 ] , route, & [ & [ & nodes[ 1 ] ] , & [ & nodes[ 1 ] ] ] , amt_msat, payment_hash,
969+ payment_secret,
970+ ) ;
971+
972+ // Move both channels into `AWAITING_REMOTE_REVOKE` by having nodes[0] send fee updates and
973+ // withholding nodes[1]'s responding `commitment_signed`s. When nodes[1] later claims the
974+ // payment, the fulfill updates cannot be sent immediately and instead sit in each channel's
975+ // holding cell.
976+ {
977+ let mut fee_est = chanmon_cfgs[ 0 ] . fee_estimator . sat_per_kw . lock ( ) . unwrap ( ) ;
978+ * fee_est *= 2 ;
979+ }
980+ nodes[ 0 ] . node . timer_tick_occurred ( ) ;
981+ check_added_monitors ( & nodes[ 0 ] , 2 ) ;
982+
983+ let node_0_id = nodes[ 0 ] . node . get_our_node_id ( ) ;
984+ let node_1_id = nodes[ 1 ] . node . get_our_node_id ( ) ;
985+
986+ let fee_msgs = nodes[ 0 ] . node . get_and_clear_pending_msg_events ( ) ;
987+ assert_eq ! ( fee_msgs. len( ) , 2 ) ;
988+ for ev in & fee_msgs {
989+ match ev {
990+ MessageSendEvent :: UpdateHTLCs { updates, .. } => {
991+ nodes[ 1 ] . node . handle_update_fee ( node_0_id, updates. update_fee . as_ref ( ) . unwrap ( ) ) ;
992+ nodes[ 1 ] . node . handle_commitment_signed_batch_test (
993+ node_0_id, & updates. commitment_signed ,
994+ ) ;
995+ check_added_monitors ( & nodes[ 1 ] , 1 ) ;
996+ } ,
997+ _ => panic ! ( "Unexpected message: {:?}" , ev) ,
998+ }
999+ }
1000+
1001+ // nodes[1] responds to each fee update with a `revoke_and_ack` and a new
1002+ // `commitment_signed`. Deliver only the `revoke_and_ack`s for now. The held
1003+ // `commitment_signed`s are delivered after nodes[1] claims the payment, creating the blocked
1004+ // post-claim monitor updates whose release is exercised after reload.
1005+ let node_1_msgs = nodes[ 1 ] . node . get_and_clear_pending_msg_events ( ) ;
1006+ let mut commitment_signed_msgs = Vec :: new ( ) ;
1007+ for ev in & node_1_msgs {
1008+ match ev {
1009+ MessageSendEvent :: SendRevokeAndACK { msg, .. } => {
1010+ nodes[ 0 ] . node . handle_revoke_and_ack ( node_1_id, msg) ;
1011+ check_added_monitors ( & nodes[ 0 ] , 1 ) ;
1012+ } ,
1013+ MessageSendEvent :: UpdateHTLCs { updates, .. } => {
1014+ commitment_signed_msgs. push ( updates. commitment_signed . clone ( ) ) ;
1015+ } ,
1016+ _ => panic ! ( "Unexpected message: {:?}" , ev) ,
1017+ }
1018+ }
1019+
1020+ let node_0_msgs = nodes[ 0 ] . node . get_and_clear_pending_msg_events ( ) ;
1021+ for ev in & node_0_msgs {
1022+ match ev {
1023+ MessageSendEvent :: SendRevokeAndACK { msg, .. } => {
1024+ nodes[ 1 ] . node . handle_revoke_and_ack ( node_0_id, msg) ;
1025+ check_added_monitors ( & nodes[ 1 ] , 1 ) ;
1026+ } ,
1027+ _ => panic ! ( "Unexpected message: {:?}" , ev) ,
1028+ }
1029+ }
1030+
1031+ // Snapshot channel B before the claim. The in-memory ChainMonitor applies updates even when
1032+ // the persister returns `InProgress`, so taking this snapshot after the claim would not model a
1033+ // crash between two separate monitor writes.
1034+ let mon_b_serialized = get_monitor ! ( nodes[ 1 ] , chan_id_b) . encode ( ) ;
1035+
1036+ // Make both preimage monitor writes asynchronous. `claim_funds` attaches an in-memory MPP RAA
1037+ // blocker so neither channel can release later monitor updates until all channels have the
1038+ // preimage durably persisted.
1039+ chanmon_cfgs[ 1 ] . persister . set_update_ret ( ChannelMonitorUpdateStatus :: InProgress ) ;
1040+ chanmon_cfgs[ 1 ] . persister . set_update_ret ( ChannelMonitorUpdateStatus :: InProgress ) ;
1041+ nodes[ 1 ] . node . claim_funds ( payment_preimage) ;
1042+ check_added_monitors ( & nodes[ 1 ] , 2 ) ;
1043+
1044+ // Complete only channel A's preimage update. Channel B will be reloaded from the stale snapshot
1045+ // above, simulating a crash where one monitor write completed and the other did not.
1046+ let ( update_id_a, _) = get_latest_mon_update_id ( & nodes[ 1 ] , chan_id_a) ;
1047+ nodes[ 1 ] . chain_monitor . chain_monitor . force_channel_monitor_updated ( chan_id_a, update_id_a) ;
1048+
1049+ // Now finish the fee-update commitment dance we held back. nodes[1] receives nodes[0]'s
1050+ // `revoke_and_ack`s while the MPP RAA blocker is still in place, so the resulting monitor
1051+ // updates are blocked behind state that is not serialized in the ChannelManager.
1052+ for commitment_signed in & commitment_signed_msgs {
1053+ nodes[ 0 ] . node . handle_commitment_signed_batch_test ( node_1_id, commitment_signed) ;
1054+ check_added_monitors ( & nodes[ 0 ] , 1 ) ;
1055+ }
1056+ let node_0_msgs = nodes[ 0 ] . node . get_and_clear_pending_msg_events ( ) ;
1057+ for ev in & node_0_msgs {
1058+ match ev {
1059+ MessageSendEvent :: SendRevokeAndACK { msg, .. } => {
1060+ nodes[ 1 ] . node . handle_revoke_and_ack ( node_0_id, msg) ;
1061+ check_added_monitors ( & nodes[ 1 ] , 0 ) ;
1062+ } ,
1063+ _ => panic ! ( "Unexpected message: {:?}" , ev) ,
1064+ }
1065+ }
1066+
1067+ // Persist the ChannelManager after the blocked post-claim monitor updates have been recorded.
1068+ // Reload with channel A's up-to-date monitor and channel B's stale monitor. The preimage update
1069+ // for B is replayed during reload, putting both channels' preimages on disk. The remaining state
1070+ // under test is the blocked post-claim `revoke_and_ack` monitor updates after the in-memory MPP
1071+ // RAA blocker that created them is gone.
1072+ let node_1_serialized = nodes[ 1 ] . node . encode ( ) ;
1073+ let mon_a_serialized = get_monitor ! ( nodes[ 1 ] , chan_id_a) . encode ( ) ;
1074+
1075+ nodes[ 0 ] . node . peer_disconnected ( node_1_id) ;
1076+ reload_node ! (
1077+ nodes[ 1 ] ,
1078+ node_1_serialized,
1079+ & [ & mon_a_serialized, & mon_b_serialized] ,
1080+ persister,
1081+ new_chain_monitor,
1082+ nodes_1_deserialized
1083+ ) ;
1084+
1085+ // Reconnect both peers by manually exchanging `channel_reestablish`s. This avoids relying on a
1086+ // more general reconnect helper while the channels intentionally have asymmetric monitor state.
1087+ let node_1_id = nodes[ 1 ] . node . get_our_node_id ( ) ;
1088+ nodes[ 0 ] . node . peer_connected ( node_1_id, & msgs:: Init {
1089+ features : nodes[ 1 ] . node . init_features ( ) , networks : None , remote_network_address : None ,
1090+ } , true ) . unwrap ( ) ;
1091+ nodes[ 1 ] . node . peer_connected ( node_0_id, & msgs:: Init {
1092+ features : nodes[ 0 ] . node . init_features ( ) , networks : None , remote_network_address : None ,
1093+ } , false ) . unwrap ( ) ;
1094+
1095+ let reestablish_0 = nodes[ 0 ] . node . get_and_clear_pending_msg_events ( ) ;
1096+ let reestablish_1 = nodes[ 1 ] . node . get_and_clear_pending_msg_events ( ) ;
1097+ let mut reestablish_0_chan_ids = Vec :: new ( ) ;
1098+ let mut reestablish_1_chan_ids = Vec :: new ( ) ;
1099+ for ev in & reestablish_1 {
1100+ match ev {
1101+ MessageSendEvent :: SendChannelReestablish { node_id, msg } => {
1102+ assert_eq ! ( * node_id, node_0_id) ;
1103+ reestablish_1_chan_ids. push ( msg. channel_id ) ;
1104+ nodes[ 0 ] . node . handle_channel_reestablish ( node_1_id, msg) ;
1105+ } ,
1106+ _ => panic ! ( "Unexpected message: {:?}" , ev) ,
1107+ }
1108+ }
1109+ for ev in & reestablish_0 {
1110+ match ev {
1111+ MessageSendEvent :: SendChannelReestablish { node_id, msg } => {
1112+ assert_eq ! ( * node_id, node_1_id) ;
1113+ reestablish_0_chan_ids. push ( msg. channel_id ) ;
1114+ nodes[ 1 ] . node . handle_channel_reestablish ( node_0_id, msg) ;
1115+ } ,
1116+ _ => panic ! ( "Unexpected message: {:?}" , ev) ,
1117+ }
1118+ }
1119+ assert_eq ! ( reestablish_0_chan_ids. len( ) , 2 ) ;
1120+ assert ! ( reestablish_0_chan_ids. contains( & chan_id_a) ) ;
1121+ assert ! ( reestablish_0_chan_ids. contains( & chan_id_b) ) ;
1122+ assert_eq ! ( reestablish_1_chan_ids. len( ) , 2 ) ;
1123+ assert ! ( reestablish_1_chan_ids. contains( & chan_id_a) ) ;
1124+ assert ! ( reestablish_1_chan_ids. contains( & chan_id_b) ) ;
1125+ // Only nodes[1] was reloaded with stale monitor state. nodes[0] responds to the
1126+ // `channel_reestablish`s without touching its monitors. nodes[1] applies the replayed channel B
1127+ // preimage update, releases channel A's held RAA update, and frees channel A's held fulfill
1128+ // during startup processing.
1129+ check_added_monitors ( & nodes[ 0 ] , 0 ) ;
1130+ check_added_monitors ( & nodes[ 1 ] , 3 ) ;
1131+
1132+ // The first message batch after reconnect contains channel updates from both nodes. nodes[1]
1133+ // also sends the channel A fulfill that startup processing released from the holding cell.
1134+ let restart_msgs_0 = nodes[ 0 ] . node . get_and_clear_pending_msg_events ( ) ;
1135+ let restart_msgs_1 = nodes[ 1 ] . node . get_and_clear_pending_msg_events ( ) ;
1136+ let mut restart_scids_0 = Vec :: new ( ) ;
1137+ let mut restart_scids_1 = Vec :: new ( ) ;
1138+ let mut startup_fulfill_chan_ids = Vec :: new ( ) ;
1139+ for ev in & restart_msgs_0 {
1140+ match ev {
1141+ MessageSendEvent :: SendChannelUpdate { node_id, msg } => {
1142+ assert_eq ! ( * node_id, node_1_id) ;
1143+ restart_scids_0. push ( msg. contents . short_channel_id ) ;
1144+ } ,
1145+ _ => panic ! ( "Unexpected restart message from node 0: {:?}" , ev) ,
1146+ }
1147+ }
1148+ for ev in & restart_msgs_1 {
1149+ match ev {
1150+ MessageSendEvent :: SendChannelUpdate { node_id, msg } => {
1151+ assert_eq ! ( * node_id, node_0_id) ;
1152+ restart_scids_1. push ( msg. contents . short_channel_id ) ;
1153+ } ,
1154+ MessageSendEvent :: UpdateHTLCs { node_id, channel_id, updates } => {
1155+ assert_eq ! ( * node_id, node_0_id) ;
1156+ startup_fulfill_chan_ids. push ( * channel_id) ;
1157+ assert_eq ! ( updates. update_fulfill_htlcs. len( ) , 1 ) ;
1158+ assert ! ( updates. update_add_htlcs. is_empty( ) ) ;
1159+ assert ! ( updates. update_fail_htlcs. is_empty( ) ) ;
1160+ assert ! ( updates. update_fail_malformed_htlcs. is_empty( ) ) ;
1161+ assert ! ( updates. update_fee. is_none( ) ) ;
1162+ for fulfill in & updates. update_fulfill_htlcs {
1163+ nodes[ 0 ] . node . handle_update_fulfill_htlc ( node_1_id, fulfill. clone ( ) ) ;
1164+ }
1165+ // Complete the standard commitment handshake for the released fulfill. The helper
1166+ // checks nodes[0]'s incoming commitment monitor update, nodes[1]'s response monitor
1167+ // updates, and nodes[0]'s held final monitor update.
1168+ do_commitment_signed_dance (
1169+ & nodes[ 0 ] , & nodes[ 1 ] , & updates. commitment_signed , false , false ,
1170+ ) ;
1171+ } ,
1172+ _ => panic ! ( "Unexpected restart message from node 1: {:?}" , ev) ,
1173+ }
1174+ }
1175+ assert_eq ! ( restart_scids_0. len( ) , 2 ) ;
1176+ assert ! ( restart_scids_0. contains( & scid_a) ) ;
1177+ assert ! ( restart_scids_0. contains( & scid_b) ) ;
1178+ assert_eq ! ( restart_scids_1. len( ) , 2 ) ;
1179+ assert ! ( restart_scids_1. contains( & scid_a) ) ;
1180+ assert ! ( restart_scids_1. contains( & scid_b) ) ;
1181+ assert_eq ! ( startup_fulfill_chan_ids, vec![ chan_id_a] ) ;
1182+ assert ! ( nodes[ 0 ] . node. get_and_clear_pending_msg_events( ) . is_empty( ) ) ;
1183+ assert ! ( nodes[ 1 ] . node. get_and_clear_pending_msg_events( ) . is_empty( ) ) ;
1184+ check_added_monitors ( & nodes[ 0 ] , 0 ) ;
1185+ check_added_monitors ( & nodes[ 1 ] , 0 ) ;
1186+
1187+ // Receiving the startup-released fulfill gives nodes[0] the payment preimage. That is enough to
1188+ // emit `PaymentSent`, even though channel B's path-level success still needs its own fulfill.
1189+ let startup_payment_events = nodes[ 0 ] . node . get_and_clear_pending_events ( ) ;
1190+ assert_eq ! ( startup_payment_events. len( ) , 2 ) ;
1191+ let mut saw_startup_payment_sent = false ;
1192+ let mut startup_success_scids = Vec :: new ( ) ;
1193+ for ev in & startup_payment_events {
1194+ match ev {
1195+ Event :: PaymentSent {
1196+ payment_preimage : sent_preimage,
1197+ payment_hash : sent_hash,
1198+ amount_msat : sent_amount,
1199+ fee_paid_msat,
1200+ ..
1201+ } => {
1202+ assert_eq ! ( * sent_preimage, payment_preimage) ;
1203+ assert_eq ! ( * sent_hash, payment_hash) ;
1204+ assert_eq ! ( * sent_amount, Some ( amt_msat) ) ;
1205+ assert_eq ! ( * fee_paid_msat, Some ( 0 ) ) ;
1206+ saw_startup_payment_sent = true ;
1207+ } ,
1208+ Event :: PaymentPathSuccessful { payment_hash : Some ( path_hash) , path, .. } => {
1209+ assert_eq ! ( * path_hash, payment_hash) ;
1210+ assert_eq ! ( path. hops. len( ) , 1 ) ;
1211+ startup_success_scids. push ( path. hops [ 0 ] . short_channel_id ) ;
1212+ } ,
1213+ _ => panic ! ( "Unexpected startup payment event: {:?}" , ev) ,
1214+ }
1215+ }
1216+ assert ! ( saw_startup_payment_sent) ;
1217+ assert_eq ! ( startup_success_scids, vec![ scid_a] ) ;
1218+
1219+ // Handling the claim event runs the event-completion action that releases the remaining
1220+ // RAA-blocked monitor update. The startup unblock path already released channel A, so channel B
1221+ // is the only fulfill that should be emitted here.
1222+ let claim_events = nodes[ 1 ] . node . get_and_clear_pending_events ( ) ;
1223+ assert_eq ! ( claim_events. len( ) , 1 ) ;
1224+ match & claim_events[ 0 ] {
1225+ Event :: PaymentClaimed { payment_hash : claimed_hash, amount_msat, htlcs, .. } => {
1226+ assert_eq ! ( * claimed_hash, payment_hash) ;
1227+ assert_eq ! ( * amount_msat, amt_msat) ;
1228+ assert_eq ! ( htlcs. len( ) , 2 ) ;
1229+ } ,
1230+ _ => panic ! ( "Unexpected event: {:?}" , claim_events[ 0 ] ) ,
1231+ }
1232+ // The `PaymentSent` event above releases the monitor update that nodes[0] held after the final
1233+ // channel A startup revocation.
1234+ check_added_monitors ( & nodes[ 0 ] , 1 ) ;
1235+ // Handling `PaymentClaimed` releases channel B's held revocation update and then the fulfill
1236+ // that was waiting behind it.
1237+ check_added_monitors ( & nodes[ 1 ] , 2 ) ;
1238+
1239+ // Channel A's fulfill was already sent during startup. The `PaymentClaimed` completion action
1240+ // now frees channel B's held fulfill, and no other HTLC update should be bundled with it.
1241+ let fulfill_msgs = nodes[ 1 ] . node . get_and_clear_pending_msg_events ( ) ;
1242+ assert_eq ! ( fulfill_msgs. len( ) , 1 ) ;
1243+ match & fulfill_msgs[ 0 ] {
1244+ MessageSendEvent :: UpdateHTLCs { node_id, channel_id, updates } => {
1245+ assert_eq ! ( * node_id, node_0_id) ;
1246+ assert_eq ! ( * channel_id, chan_id_b) ;
1247+ assert_eq ! ( updates. update_fulfill_htlcs. len( ) , 1 ) ;
1248+ assert ! ( updates. update_add_htlcs. is_empty( ) ) ;
1249+ assert ! ( updates. update_fail_htlcs. is_empty( ) ) ;
1250+ assert ! ( updates. update_fail_malformed_htlcs. is_empty( ) ) ;
1251+ assert ! ( updates. update_fee. is_none( ) ) ;
1252+ for fulfill in & updates. update_fulfill_htlcs {
1253+ nodes[ 0 ] . node . handle_update_fulfill_htlc ( node_1_id, fulfill. clone ( ) ) ;
1254+ }
1255+ // Complete the same commitment handshake for channel B. Here nodes[0]'s final monitor
1256+ // update is persisted immediately because `PaymentSent` already ran for channel A.
1257+ do_commitment_signed_dance (
1258+ & nodes[ 0 ] , & nodes[ 1 ] , & updates. commitment_signed , false , false ,
1259+ ) ;
1260+ } ,
1261+ _ => panic ! ( "Unexpected fulfill message: {:?}" , fulfill_msgs[ 0 ] ) ,
1262+ }
1263+ check_added_monitors ( & nodes[ 1 ] , 0 ) ;
1264+ assert ! ( nodes[ 0 ] . node. get_and_clear_pending_msg_events( ) . is_empty( ) ) ;
1265+ assert ! ( nodes[ 1 ] . node. get_and_clear_pending_msg_events( ) . is_empty( ) ) ;
1266+
1267+ let final_payment_events = nodes[ 0 ] . node . get_and_clear_pending_events ( ) ;
1268+ assert_eq ! ( final_payment_events. len( ) , 1 ) ;
1269+ match & final_payment_events[ 0 ] {
1270+ Event :: PaymentPathSuccessful { payment_hash : Some ( path_hash) , path, .. } => {
1271+ assert_eq ! ( * path_hash, payment_hash) ;
1272+ assert_eq ! ( path. hops. len( ) , 1 ) ;
1273+ assert_eq ! ( path. hops[ 0 ] . short_channel_id, scid_b) ;
1274+ } ,
1275+ _ => panic ! ( "Unexpected final payment event: {:?}" , final_payment_events[ 0 ] ) ,
1276+ }
1277+ check_added_monitors ( & nodes[ 0 ] , 0 ) ;
1278+ assert ! ( nodes[ 1 ] . node. get_and_clear_pending_events( ) . is_empty( ) ) ;
1279+ check_added_monitors ( & nodes[ 0 ] , 0 ) ;
1280+ check_added_monitors ( & nodes[ 1 ] , 0 ) ;
1281+
1282+ // Both MPP parts should have been fulfilled back to nodes[0]. If either channel still has a
1283+ // pending outbound HTLC, its fulfill remained stuck in nodes[1]'s holding cell after reload.
1284+ let pending: Vec < _ > = nodes[ 0 ] . node . list_channels ( ) . iter ( )
1285+ . filter ( |channel| channel. channel_id == chan_id_a || channel. channel_id == chan_id_b)
1286+ . filter ( |channel| !channel. pending_outbound_htlcs . is_empty ( ) )
1287+ . map ( |channel| channel. channel_id )
1288+ . collect ( ) ;
1289+ assert ! ( pending. is_empty( ) , "HTLC fulfills remained stuck on channels {:?}" , pending) ;
1290+ }
1291+
9401292fn do_forwarded_payment_no_manager_persistence ( use_cs_commitment : bool , claim_htlc : bool , use_intercept : bool ) {
9411293 if !use_cs_commitment { assert ! ( !claim_htlc) ; }
9421294 // If we go to forward a payment, and the ChannelMonitor persistence completes, but the
0 commit comments