|
16 | 16 | //! Functions translating raw OKX WebSocket frames into Nautilus data types. |
17 | 17 |
|
18 | 18 | use ahash::AHashMap; |
19 | | -use nautilus_core::nanos::UnixNanos; |
| 19 | +use nautilus_core::{UUID4, nanos::UnixNanos}; |
20 | 20 | use nautilus_model::{ |
21 | 21 | data::{ |
22 | 22 | Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate, |
@@ -644,8 +644,12 @@ pub fn parse_order_msg( |
644 | 644 |
|
645 | 645 | let previous_fee = fee_cache.get(&msg.ord_id).copied(); |
646 | 646 |
|
| 647 | + // Only generate fill reports when there's actual new fill data |
| 648 | + // Check if fillSz is non-zero/non-empty OR trade_id is present |
| 649 | + let has_new_fill = (!msg.fill_sz.is_empty() && msg.fill_sz != "0") || !msg.trade_id.is_empty(); |
| 650 | + |
647 | 651 | match msg.state { |
648 | | - OKXOrderStatus::Filled | OKXOrderStatus::PartiallyFilled => { |
| 652 | + OKXOrderStatus::Filled | OKXOrderStatus::PartiallyFilled if has_new_fill => { |
649 | 653 | parse_fill_report(msg, instrument, account_id, previous_fee, ts_init) |
650 | 654 | .map(ExecutionReport::Fill) |
651 | 655 | } |
@@ -804,7 +808,6 @@ pub fn parse_order_status_report( |
804 | 808 | let size_precision = instrument.size_precision(); |
805 | 809 | let quantity = parse_quantity(&msg.sz, size_precision)?; |
806 | 810 | let filled_qty = parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?; |
807 | | - |
808 | 811 | let ts_accepted = parse_millisecond_timestamp(msg.c_time); |
809 | 812 | let ts_last = parse_millisecond_timestamp(msg.u_time); |
810 | 813 |
|
@@ -904,16 +907,65 @@ pub fn parse_fill_report( |
904 | 907 | ) -> anyhow::Result<FillReport> { |
905 | 908 | let client_order_id = parse_client_order_id(&msg.cl_ord_id); |
906 | 909 | let venue_order_id = VenueOrderId::new(msg.ord_id); |
907 | | - let trade_id = TradeId::from(msg.trade_id.as_str()); |
| 910 | + |
| 911 | + // TODO: Extract to dedicated function: |
| 912 | + // OKX may not provide a trade_id, so generate a UUID4 as fallback |
| 913 | + let trade_id = if msg.trade_id.is_empty() { |
| 914 | + TradeId::from(UUID4::new().to_string().as_str()) |
| 915 | + } else { |
| 916 | + TradeId::from(msg.trade_id.as_str()) |
| 917 | + }; |
| 918 | + |
908 | 919 | let order_side: OrderSide = msg.side.into(); |
909 | 920 |
|
910 | 921 | let price_precision = instrument.price_precision(); |
911 | 922 | let size_precision = instrument.size_precision(); |
912 | | - let last_px = parse_price(&msg.fill_px, price_precision)?; |
913 | | - let last_qty = parse_quantity(&msg.fill_sz, size_precision)?; |
| 923 | + |
| 924 | + // TODO: Extract to dedicated function: |
| 925 | + // OKX may not provide fillPx for some orders, fall back to avgPx or lastPx |
| 926 | + let price_str = if !msg.fill_px.is_empty() { |
| 927 | + &msg.fill_px |
| 928 | + } else if !msg.avg_px.is_empty() { |
| 929 | + &msg.avg_px |
| 930 | + } else { |
| 931 | + &msg.px // Last resort, use order price |
| 932 | + }; |
| 933 | + let last_px = parse_price(price_str, price_precision).map_err(|e| { |
| 934 | + anyhow::anyhow!( |
| 935 | + "Failed to parse price (fill_px='{}', avg_px='{}', px='{}'): {}", |
| 936 | + msg.fill_px, |
| 937 | + msg.avg_px, |
| 938 | + msg.px, |
| 939 | + e |
| 940 | + ) |
| 941 | + })?; |
| 942 | + |
| 943 | + // TODO: Extract to dedicated function: |
| 944 | + // OKX may not provide fillSz for some orders, fall back to accFillSz (accumulated fill size) |
| 945 | + let qty_str = if !msg.fill_sz.is_empty() && msg.fill_sz != "0" { |
| 946 | + &msg.fill_sz |
| 947 | + } else if let Some(ref acc_fill_sz) = msg.acc_fill_sz { |
| 948 | + if !acc_fill_sz.is_empty() && acc_fill_sz != "0" { |
| 949 | + acc_fill_sz |
| 950 | + } else { |
| 951 | + &msg.sz // Last resort, use order size |
| 952 | + } |
| 953 | + } else { |
| 954 | + &msg.sz // Last resort, use order size |
| 955 | + }; |
| 956 | + let last_qty = parse_quantity(qty_str, size_precision).map_err(|e| { |
| 957 | + anyhow::anyhow!( |
| 958 | + "Failed to parse quantity (fill_sz='{}', acc_fill_sz={:?}, sz='{}'): {}", |
| 959 | + msg.fill_sz, |
| 960 | + msg.acc_fill_sz, |
| 961 | + msg.sz, |
| 962 | + e |
| 963 | + ) |
| 964 | + })?; |
914 | 965 |
|
915 | 966 | let fee_currency = Currency::from(&msg.fee_ccy); |
916 | | - let total_fee = parse_fee(msg.fee.as_deref(), fee_currency)?; |
| 967 | + let total_fee = parse_fee(msg.fee.as_deref(), fee_currency) |
| 968 | + .map_err(|e| anyhow::anyhow!("Failed to parse fee={:?}: {}", msg.fee, e))?; |
917 | 969 | let commission = if let Some(previous_fee) = previous_fee { |
918 | 970 | total_fee - previous_fee |
919 | 971 | } else { |
|
0 commit comments