@@ -986,8 +986,7 @@ impl CheckpointStore {
986
986
}
987
987
988
988
/// Updates the store on the basis of transactions that have been processed. This is idempotent
989
- /// and nothing unsafe happens if it is called twice. Returns the lowest checkpoint number with
990
- /// unprocessed transactions (this is the low watermark).
989
+ /// and nothing unsafe happens if it is called twice.
991
990
fn update_processed_transactions (
992
991
& mut self , // We take by &mut to prevent concurrent access.
993
992
transactions : & [ ( TxSequenceNumber , ExecutionDigests ) ] ,
@@ -996,67 +995,34 @@ impl CheckpointStore {
996
995
. tables
997
996
. transactions_to_checkpoint
998
997
. multi_get ( transactions. iter ( ) . map ( |( _, tx) | tx) ) ?;
999
-
1000
- let batch = self . tables . transactions_to_checkpoint . batch ( ) ;
1001
-
1002
- // Check we are not re-proposing the same transactions that are already in a
1003
- // final checkpoint. This should not be possible since we only accept (sign /
1004
- // record) a checkpoint if we have already processed all transactions within.
1005
- let already_in_checkpoint_tx = transactions
998
+ let already_in_checkpoint_tx: Vec < _ > = transactions
1006
999
. iter ( )
1007
1000
. zip ( & in_checkpoint)
1008
- . filter_map (
1009
- |( ( _seq, tx) , in_chk) | {
1010
- if in_chk. is_some ( ) {
1011
- Some ( tx)
1012
- } else {
1013
- None
1014
- }
1015
- } ,
1016
- )
1017
- . count ( ) ;
1018
-
1019
- if already_in_checkpoint_tx != 0 {
1020
- return Err ( SuiError :: CheckpointingError {
1021
- error : "Processing transactions already in a checkpoint." . to_string ( ) ,
1022
- } ) ;
1001
+ . filter_map ( |( ( _seq, tx) , in_chk) | in_chk. map ( |_| tx) )
1002
+ . collect ( ) ;
1003
+ if !already_in_checkpoint_tx. is_empty ( ) {
1004
+ // This should never happen, but if it happens, we should not let it block checkpoint
1005
+ // progress either. Log the error so that we can keep track.
1006
+ error ! (
1007
+ "Transactions are processed and updated from batch more than once: {:?}" ,
1008
+ already_in_checkpoint_tx
1009
+ ) ;
1023
1010
}
1024
1011
1025
- // Update the entry to the transactions_to_checkpoint
1026
-
1012
+ let batch = self . tables . extra_transactions . batch ( ) ;
1027
1013
let batch = batch. insert_batch (
1028
- & self . tables . transactions_to_checkpoint ,
1029
- transactions
1030
- . iter ( )
1031
- . zip ( & in_checkpoint)
1032
- . filter_map ( |( ( seq, tx) , in_chk) | {
1033
- if in_chk. is_some ( ) {
1034
- Some ( ( tx, ( in_chk. unwrap ( ) . 0 , * seq) ) )
1035
- } else {
1036
- None
1037
- }
1038
- } ) ,
1014
+ & self . tables . extra_transactions ,
1015
+ transactions. iter ( ) . map ( |( seq, digest) | ( digest, seq) ) ,
1039
1016
) ?;
1040
1017
1041
- // If the transactions processed did not belong to a checkpoint yet, we add them to the list
1042
- // of `extra` transactions, that we should be actively propagating to others.
1043
- let new_extra: Vec < _ > = transactions
1044
- . iter ( )
1045
- . zip ( & in_checkpoint)
1046
- . filter_map ( |( ( seq, tx) , in_chk) | {
1047
- if in_chk. is_none ( ) {
1048
- Some ( ( tx, seq) )
1049
- } else {
1050
- None
1051
- }
1052
- } )
1053
- . collect ( ) ;
1054
- debug ! ( "Transactions added to extra_transactions: {:?}" , new_extra) ;
1055
- let batch = batch. insert_batch ( & self . tables . extra_transactions , new_extra) ?;
1056
-
1057
1018
// Write to the database.
1058
1019
batch. write ( ) ?;
1059
1020
1021
+ debug ! (
1022
+ "Transactions added to extra_transactions: {:?}" ,
1023
+ transactions
1024
+ ) ;
1025
+
1060
1026
Ok ( ( ) )
1061
1027
}
1062
1028
}
0 commit comments