Created
February 3, 2026 15:50
-
-
Save denniswon/d3ac0347414e079a39c1b4897f9811eb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ⏺ Based on the investigation summary, I understand the root cause: in centralized mode, the Gateway operator goes through a | |
| CHANNEL-based path while external operators go through a DIRECT aggregation path. This creates two separate aggregations instead of | |
| one unified aggregation. | |
| Let me examine the current flow in sync.rs and implement the fix: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⎿ Loaded crates/CLAUDE.md | |
| ⏺ I can see the issue clearly now. Let me trace through the code: | |
| 1. Line 1226-1254: Gateway operator uses worker.submit_with_consensus() - this is fire-and-forget to the channel-based path | |
| 2. Line 1261-1287: External operators are broadcast via broadcast_evaluate_and_sign_all() | |
| 3. Line 1317-1325: The signed_responses vector is built from ONLY external operators | |
| 4. Line 1330-1333: aggregate_responses() is called with ONLY external operator signatures | |
| The Gateway operator's response is never included in the direct aggregation! Let me check the worker to see what synchronous methods | |
| are available: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⏺ I see there's a response_tx: Option<oneshot::Sender<Option<SignedTaskResponse>>> in the task enum - this suggests there's a way to | |
| get a synchronous response. Let me look at the submission methods: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⏺ I found the key insight. There's a submit_and_wait method for Prepare mode (lines 239-265) that returns the SignedTaskResponse, but | |
| there's no equivalent for Commit mode. The submit_with_consensus is fire-and-forget (line 217: response_tx: None). | |
| Let me check if there's already a submit_with_consensus_and_wait method: | |
| ⏺ Searched for 1 pattern (ctrl+o to expand) | |
| ⏺ There's no such method. The fix requires: | |
| 1. Add submit_with_consensus_and_wait method to worker.rs that uses the oneshot channel | |
| 2. Update sync.rs to use this method and include the Gateway operator's response in aggregation | |
| Let me check how the dispatcher handles Commit tasks and sends responses: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⏺ Now I fully understand the bug. The issue is in process_commit_task (lines 448-486): | |
| 1. It evaluates and signs correctly (line 458) | |
| 2. It submits to aggregator via submit_to_aggregator_with_retry (line 469) → CHANNEL-based path → SEPARATE aggregation | |
| 3. It returns the response (line 472) | |
| So even if we add submit_with_consensus_and_wait, the process_commit_task function STILL submits to the aggregator, creating a | |
| separate aggregation entry! | |
| The fix needs a new processing method that skips the aggregator submission when we want to include the response in direct | |
| aggregation. Let me implement the full fix: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⏺ Now I have the full picture. I'll implement the fix in two parts: | |
| ★ Insight ───────────────────────────────────── | |
| Root Cause: The Gateway operator's response goes through CHANNEL-based aggregation (process_signed_response) which creates a SEPARATE | |
| AggregatedOperators entry, while external operators go through DIRECT aggregation (aggregate_responses). These two paths never | |
| merge. | |
| Solution: Add a synchronous method that returns the Gateway operator's response WITHOUT submitting to the channel-based aggregator, | |
| then include it in the direct aggregation path. | |
| ───────────────────────────────────────────────── | |
| Let me implement the fix: | |
| ⏺ Update(crates/gateway/src/rpc/api/worker.rs) | |
| ⎿ Added 2 lines | |
| 61 aggregator: Arc<dyn AggregatorService>, | |
| 62 response_tx: Option<oneshot::Sender<Option<SignedTaskResponse>>>, | |
| 63 context: String, | |
| 64 + /// If true, skip submitting to aggregator (for direct aggregation path) | |
| 65 + skip_aggregator_submission: bool, | |
| 66 }, | |
| 67 } | |
| 68 | |
| ⏺ Update(crates/gateway/src/rpc/api/worker.rs) | |
| ⎿ Added 44 lines | |
| 218 aggregator, | |
| 219 response_tx: None, | |
| 220 context: context.to_string(), | |
| 221 + skip_aggregator_submission: false, | |
| 222 }; | |
| 223 | |
| 224 match tokio::time::timeout(self.submit_timeout, self.tx.send(gateway_task)).await { | |
| ... | |
| 228 } | |
| 229 } | |
| 230 | |
| 231 + /// Submits a commit task and waits for the response synchronously. | |
| 232 + /// | |
| 233 + /// This method is designed for centralized mode where the gateway operator's | |
| 234 + /// response needs to be included in the DIRECT aggregation path (via | |
| 235 + /// `aggregate_responses`) rather than the channel-based path. This ensures | |
| 236 + /// all operators (gateway + external) are aggregated together, producing a | |
| 237 + /// single unified APK_G2 for BLS signature verification. | |
| 238 + /// | |
| 239 + /// # Arguments | |
| 240 + /// | |
| 241 + /// * `request` - The ConsensusCommitRequest with consensus policyTaskData | |
| 242 + /// * `context` - Context string for logging | |
| 243 + /// | |
| 244 + /// # Returns | |
| 245 + /// | |
| 246 + /// The signed task response if successful, or `None` if evaluation failed. | |
| 247 + pub async fn submit_with_consensus_and_wait( | |
| 248 + &self, | |
| 249 + request: ConsensusCommitRequest, | |
| 250 + context: &str, | |
| 251 + ) -> Result<Option<SignedTaskResponse>, WorkerError> { | |
| 252 + let (response_tx, response_rx) = oneshot::channel(); | |
| 253 + | |
| 254 + // Note: aggregator is not used when skip_aggregator_submission is true, | |
| 255 + // but we need a placeholder. The response will be returned directly | |
| 256 + // for inclusion in direct aggregation. | |
| 257 + let gateway_task = GatewayOperatorTask::Commit { | |
| 258 + request, | |
| 259 + aggregator: Arc::new(crate::task::aggregator::NoOpAggregator), | |
| 260 + response_tx: Some(response_tx), | |
| 261 + context: context.to_string(), | |
| 262 + skip_aggregator_submission: true, | |
| 263 + }; | |
| 264 + | |
| 265 + match tokio::time::timeout(self.submit_timeout, self.tx.send(gateway_task)).await { | |
| 266 + Ok(Ok(())) => {} | |
| 267 + Ok(Err(_)) => return Err(WorkerError::WorkerStopped), | |
| 268 + Err(_) => return Err(WorkerError::QueueTimeout(self.submit_timeout.as_millis() as u64)), | |
| 269 + } | |
| 270 + | |
| 271 + response_rx.await.map_err(|_| WorkerError::ResponseDropped) | |
| 272 + } | |
| 273 + | |
| 274 /// Submits a task and waits for the response synchronously (prepare mode). | |
| 275 /// | |
| 276 /// # Arguments | |
| ⎿ Found 2 new diagnostic issues in 1 file (ctrl+o to expand) | |
| ⏺ Now I need to: | |
| 1. Add NoOpAggregator to the aggregator module | |
| 2. Fix the dispatcher to handle the new field | |
| 3. Update process_commit_task to conditionally skip aggregator submission | |
| Let me check the aggregator trait first: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⎿ Found 1 new diagnostic issue in 1 file (ctrl+o to expand) | |
| ⏺ Let me read more of the aggregator file to see where to add NoOpAggregator: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⎿ Found 5 new diagnostic issues in 1 file (ctrl+o to expand) | |
| ⏺ Let me find the end of the aggregator file to add NoOpAggregator: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⏺ Now I'll add the NoOpAggregator struct before the tests: | |
| ⏺ Update(crates/gateway/src/task/aggregator.rs) | |
| ⎿ Added 91 lines | |
| 632 } | |
| 633 } | |
| 634 | |
| 635 +/// No-op aggregator for use when aggregator submission should be skipped. | |
| 636 +/// | |
| 637 +/// This is used by `GatewayOperatorWorker::submit_with_consensus_and_wait` when | |
| 638 +/// the gateway operator's response should be returned directly for inclusion in | |
| 639 +/// direct aggregation, rather than being sent to the channel-based aggregator. | |
| 640 +pub struct NoOpAggregator; | |
| 641 + | |
| 642 +#[async_trait] | |
| 643 +impl AggregatorService for NoOpAggregator { | |
| 644 + async fn initialize_task( | |
| 645 + &self, | |
| 646 + _task_id: TaskId, | |
| 647 + _task_created_block: u64, | |
| 648 + _quorum_nums: Vec<u8>, | |
| 649 + _quorum_threshold_percentage: u8, | |
| 650 + _time_to_expiry: Duration, | |
| 651 + _broadcast_count: usize, | |
| 652 + ) -> Result<()> { | |
| 653 + Ok(()) | |
| 654 + } | |
| 655 + | |
| 656 + async fn process_signed_response(&self, _signed_response: SignedTaskResponse) -> Result<()> { | |
| 657 + // No-op: skip aggregator submission | |
| 658 + Ok(()) | |
| 659 + } | |
| 660 + | |
| 661 + async fn process_operator_error( | |
| 662 + &self, | |
| 663 + _error_response: newton_prover_aggregator::rpc_server::OperatorErrorResponse, | |
| 664 + ) -> Result<()> { | |
| 665 + Ok(()) | |
| 666 + } | |
| 667 + | |
| 668 + async fn wait_for_aggregation( | |
| 669 + &self, | |
| 670 + _task_id: TaskId, | |
| 671 + _timeout_duration: Duration, | |
| 672 + ) -> Result<BlsAggregationServiceResponse> { | |
| 673 + Err(GatewayError::AggregationFailed( | |
| 674 + "NoOpAggregator does not support wait_for_aggregation".to_string(), | |
| 675 + )) | |
| 676 + } | |
| 677 + | |
| 678 + async fn submit_aggregated_response( | |
| 679 + &self, | |
| 680 + _task_request: TaskRequest, | |
| 681 + _service_response: BlsAggregationServiceResponse, | |
| 682 + ) -> Result<(TaskResponse, TransactionReceipt)> { | |
| 683 + Err(GatewayError::AggregationFailed( | |
| 684 + "NoOpAggregator does not support submit_aggregated_response".to_string(), | |
| 685 + )) | |
| 686 + } | |
| 687 + | |
| 688 + async fn aggregate_responses( | |
| 689 + &self, | |
| 690 + _task_id: TaskId, | |
| 691 + _task_created_block: u64, | |
| 692 + _signed_responses: &[(OperatorId, SignedTaskResponse)], | |
| 693 + _quorum_numbers: &[u8], | |
| 694 + ) -> Result<BlsAggregationServiceResponse> { | |
| 695 + Err(GatewayError::AggregationFailed( | |
| 696 + "NoOpAggregator does not support aggregate_responses".to_string(), | |
| 697 + )) | |
| 698 + } | |
| 699 + | |
| 700 + async fn store_task_response( | |
| 701 + &self, | |
| 702 + _task_id: TaskId, | |
| 703 + _task_response_digest: TaskResponseDigest, | |
| 704 + _task_response: BindingTaskResponse, | |
| 705 + ) { | |
| 706 + // No-op | |
| 707 + } | |
| 708 + | |
| 709 + fn get_task_response(&self, _task_id: TaskId, _task_response_digest: TaskResponseDigest) -> Option<TaskResponse> | |
| +{ | |
| 710 + None | |
| 711 + } | |
| 712 + | |
| 713 + async fn update_response_indices( | |
| 714 + &self, | |
| 715 + _task_id: TaskId, | |
| 716 + _service_response: BlsAggregationServiceResponse, | |
| 717 + _new_task_created_block: u64, | |
| 718 + _quorum_numbers: &[u8], | |
| 719 + ) -> Result<BlsAggregationServiceResponse> { | |
| 720 + Err(GatewayError::AggregationFailed( | |
| 721 + "NoOpAggregator does not support update_response_indices".to_string(), | |
| 722 + )) | |
| 723 + } | |
| 724 +} | |
| 725 + | |
| 726 #[cfg(test)] | |
| 727 mod tests { | |
| 728 use super::*; | |
| ⏺ Now let me fix the dispatcher in worker.rs to handle the skip_aggregator_submission field: | |
| ⏺ Update(crates/gateway/src/rpc/api/worker.rs) | |
| ⎿ Added 2 lines | |
| 401 aggregator, | |
| 402 response_tx, | |
| 403 context, | |
| 404 + skip_aggregator_submission, | |
| 405 } => { | |
| 406 // Commit mode: use pre-provided consensus policyTaskData | |
| 407 let task_id = request.task_id; | |
| ... | |
| 412 &operator_core, | |
| 413 &operator_id, | |
| 414 &retry_config, | |
| 415 + skip_aggregator_submission, | |
| 416 )) | |
| 417 .catch_unwind() | |
| 418 .await; | |
| ⏺ Now let me update process_commit_task to accept and use the skip_aggregator_submission flag: | |
| ⏺ Update(crates/gateway/src/rpc/api/worker.rs) | |
| ⎿ Added 19 lines, removed 3 lines | |
| 493 /// | |
| 494 /// This ensures the gateway operator signs the same digest as external operators, | |
| 495 /// enabling successful BLS signature aggregation. | |
| 496 + /// | |
| 497 + /// # Arguments | |
| 498 + /// | |
| 499 + /// * `request` - The consensus commit request with pre-provided policyTaskData | |
| 500 + /// * `context` - Context string for logging | |
| 501 + /// * `aggregator` - The aggregator service (may be NoOpAggregator if skipping) | |
| 502 + /// * `operator_core` - The operator core for evaluation and signing | |
| 503 + /// * `operator_id` - The operator ID for logging | |
| 504 + /// * `retry_config` - Retry configuration for aggregator submission | |
| 505 + /// * `skip_aggregator_submission` - If true, skip submitting to the channel-based | |
| 506 + /// aggregator. This is used when the response will be included in direct | |
| 507 + /// aggregation (via `aggregate_responses`) instead. | |
| 508 async fn process_commit_task( | |
| 509 request: ConsensusCommitRequest, | |
| 510 context: &str, | |
| ... | |
| 512 operator_core: &Arc<OperatorCore>, | |
| 513 operator_id: &str, | |
| 514 retry_config: &RetryConfig, | |
| 515 + skip_aggregator_submission: bool, | |
| 516 ) -> Option<SignedTaskResponse> { | |
| 517 let task_id = request.task_id; | |
| 518 | |
| ... | |
| 523 context = %context, | |
| 524 operator_id = %operator_id, | |
| 525 mode = "commit", | |
| 526 + skip_aggregator = skip_aggregator_submission, | |
| 527 "gateway operator task evaluated successfully with consensus data" | |
| 528 ); | |
| 529 | |
| 516 - // Submit to aggregator with retry logic | |
| 517 - Self::submit_to_aggregator_with_retry(&aggregator, signed_response.clone(), task_id, retry_config) | |
| 518 - .await; | |
| 530 + // Only submit to aggregator if not skipping (direct aggregation path) | |
| 531 + if !skip_aggregator_submission { | |
| 532 + Self::submit_to_aggregator_with_retry(&aggregator, signed_response.clone(), task_id, retry_config | |
| +) | |
| 533 + .await; | |
| 534 + } | |
| 535 | |
| 536 Some(signed_response) | |
| 537 } | |
| ⎿ Found 5 new diagnostic issues in 1 file (ctrl+o to expand) | |
| ⏺ I see - there's also a call to process_prepare_task that's now showing an error. Let me check which call is incorrect: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⏺ Let me verify the function definition is correct: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⎿ Found 1 new diagnostic issue in 1 file (ctrl+o to expand) | |
| ⏺ The function definition looks correct. The diagnostic might be stale. Let me also add Debug derive to NoOpAggregator: | |
| ⏺ Update(crates/gateway/src/task/aggregator.rs) | |
| ⎿ Added 1 line | |
| 637 /// This is used by `GatewayOperatorWorker::submit_with_consensus_and_wait` when | |
| 638 /// the gateway operator's response should be returned directly for inclusion in | |
| 639 /// direct aggregation, rather than being sent to the channel-based aggregator. | |
| 640 +#[derive(Debug)] | |
| 641 pub struct NoOpAggregator; | |
| 642 | |
| 643 #[async_trait] | |
| ⏺ Now let me update sync.rs to use the new method and include the Gateway operator's response in direct aggregation: | |
| ⏺ Read 1 file (ctrl+o to expand) | |
| ⏺ Now I need to rewrite this section to: | |
| 1. Run Gateway operator evaluation in parallel with external operator broadcast | |
| 2. Include Gateway operator's response in the signed_responses vector | |
| ⏺ Update(crates/gateway/src/rpc/api/sync.rs) | |
| ⎿ Added 100 lines, removed 59 lines | |
| 1220 wasm_args: task_request.wasm_args.clone().unwrap_or_default(), | |
| 1221 }; | |
| 1222 | |
| 1223 - // Submit gateway operator with the SAME centralized consensus data (fire-and-forget) | |
| 1224 - // This ensures the gateway operator signs the same digest as external operators, | |
| 1225 - // which is critical for BLS signature aggregation to succeed. | |
| 1226 - if let Some(worker) = &self.operator_worker { | |
| 1227 - if let Err(e) = worker | |
| 1228 - .submit_with_consensus( | |
| 1229 - consensus_commit_request.clone(), | |
| 1230 - self.aggregator.clone(), | |
| 1231 - "sync-centralized", | |
| 1232 - ) | |
| 1233 - .await | |
| 1234 - { | |
| 1235 - match &e { | |
| 1236 - crate::rpc::api::worker::WorkerError::QueueTimeout(ms) => { | |
| 1237 - warn!( | |
| 1223 + let phase2_request = build_evaluate_and_sign_request(task_id, &consensus_commit_request); | |
| 1224 + | |
| 1225 + // ═══════════════════════════════════════════════════════════════════════════ | |
| 1226 + // PARALLEL EXECUTION: Gateway operator + external operators | |
| 1227 + // ═══════════════════════════════════════════════════════════════════════════ | |
| 1228 + // Run gateway operator evaluation in parallel with external operator broadcast. | |
| 1229 + // This ensures the gateway operator's response is included in the DIRECT | |
| 1230 + // aggregation path (via aggregate_responses) rather than creating a separate | |
| 1231 + // aggregation through the channel-based path. | |
| 1232 + let phase2_start = Instant::now(); | |
| 1233 + | |
| 1234 + // Gateway operator task (returns response for direct aggregation, skips channel-based aggregator) | |
| 1235 + let gateway_task = async { | |
| 1236 + if let Some(worker) = &self.operator_worker { | |
| 1237 + match worker | |
| 1238 + .submit_with_consensus_and_wait(consensus_commit_request.clone(), "sync-centralized") | |
| 1239 + .await | |
| 1240 + { | |
| 1241 + Ok(Some(response)) => { | |
| 1242 + info!( | |
| 1243 task_id = %task_id, | |
| 1239 - timeout_ms = %ms, | |
| 1244 + operator_id = %response.operator_id(), | |
| 1245 mode = "centralized", | |
| 1241 - "gateway operator queue full, task not submitted" | |
| 1246 + "gateway operator response received for direct aggregation" | |
| 1247 ); | |
| 1248 + Some(response) | |
| 1249 } | |
| 1244 - _ => { | |
| 1245 - warn!(task_id = %task_id, error = %e, mode = "centralized", "failed to submit to gateway ope | |
| -rator worker"); | |
| 1250 + Ok(None) => { | |
| 1251 + warn!(task_id = %task_id, mode = "centralized", "gateway operator evaluation returned None") | |
| +; | |
| 1252 + None | |
| 1253 } | |
| 1254 + Err(e) => { | |
| 1255 + match &e { | |
| 1256 + crate::rpc::api::worker::WorkerError::QueueTimeout(ms) => { | |
| 1257 + warn!( | |
| 1258 + task_id = %task_id, | |
| 1259 + timeout_ms = %ms, | |
| 1260 + mode = "centralized", | |
| 1261 + "gateway operator queue full, task not submitted" | |
| 1262 + ); | |
| 1263 + } | |
| 1264 + _ => { | |
| 1265 + warn!(task_id = %task_id, error = %e, mode = "centralized", "failed to get gateway o | |
| +perator response"); | |
| 1266 + } | |
| 1267 + } | |
| 1268 + None | |
| 1269 + } | |
| 1270 } | |
| 1271 } else { | |
| 1249 - info!( | |
| 1250 - task_id = %task_id, | |
| 1251 - mode = "centralized", | |
| 1252 - "gateway operator submitted with centralized consensus data" | |
| 1253 - ); | |
| 1272 + None | |
| 1273 } | |
| 1255 - } | |
| 1274 + }; | |
| 1275 | |
| 1257 - let phase2_request = build_evaluate_and_sign_request(task_id, &consensus_commit_request); | |
| 1258 - | |
| 1259 - // Broadcast to ALL external operators (no Prepare phase filtering) | |
| 1260 - let phase2_start = Instant::now(); | |
| 1261 - let eval_sign_responses = match self | |
| 1276 + // External operators task | |
| 1277 + let external_task = self | |
| 1278 .operator_pool | |
| 1263 - .broadcast_evaluate_and_sign_all(&phase2_request, quorum_numbers.to_vec(), quorum_threshold, timeout) | |
| 1264 - .await | |
| 1265 - { | |
| 1279 + .broadcast_evaluate_and_sign_all(&phase2_request, quorum_numbers.to_vec(), quorum_threshold, timeout); | |
| 1280 + | |
| 1281 + // Run both tasks in parallel | |
| 1282 + let (gateway_response, external_result) = tokio::join!(gateway_task, external_task); | |
| 1283 + | |
| 1284 + let phase2_duration = phase2_start.elapsed(); | |
| 1285 + newton_prover_metrics::record_centralized_evaluate_duration(phase2_duration.as_secs_f64()); | |
| 1286 + | |
| 1287 + let eval_sign_responses = match external_result { | |
| 1288 Ok(responses) => { | |
| 1267 - let phase2_duration = phase2_start.elapsed(); | |
| 1268 - newton_prover_metrics::record_centralized_evaluate_duration(phase2_duration.as_secs_f64()); | |
| 1289 newton_prover_metrics::set_centralized_response_count(responses.len()); | |
| 1290 info!( | |
| 1271 - "[centralized] Collected {} evaluate+sign responses in {} ms", | |
| 1291 + "[centralized] Collected {} external operator responses in {} ms", | |
| 1292 responses.len(), | |
| 1293 phase2_duration.as_millis() | |
| 1294 ); | |
| 1295 responses | |
| 1296 } | |
| 1297 Err(e) => { | |
| 1278 - let phase2_duration = phase2_start.elapsed(); | |
| 1279 - newton_prover_metrics::record_centralized_evaluate_duration(phase2_duration.as_secs_f64()); | |
| 1298 error!( | |
| 1299 "[centralized] Broadcast failed after {} ms: {}", | |
| 1300 phase2_duration.as_millis(), | |
| ... | |
| 1304 } | |
| 1305 }; | |
| 1306 | |
| 1289 - if eval_sign_responses.is_empty() { | |
| 1307 + // Check we have at least one response (gateway or external) | |
| 1308 + if eval_sign_responses.is_empty() && gateway_response.is_none() { | |
| 1309 return Err(GatewayError::Internal( | |
| 1291 - "[centralized] No responses received from operators".to_string(), | |
| 1310 + "[centralized] No responses received from any operators".to_string(), | |
| 1311 )); | |
| 1312 } | |
| 1313 | |
| 1314 // ═══════════════════════════════════════════════════════════════════════════ | |
| 1296 - // AGGREGATE: All operators evaluated with SAME gateway-provided data | |
| 1315 + // AGGREGATE: All operators (gateway + external) in SINGLE aggregation | |
| 1316 // ═══════════════════════════════════════════════════════════════════════════ | |
| 1298 - info!("[centralized] Aggregating {} signatures", eval_sign_responses.len()); | |
| 1317 + // Convert ConsensusCommitResponse to SignedTaskResponse for aggregation | |
| 1318 + let mut signed_responses: Vec<(OperatorId, SignedTaskResponse)> = eval_sign_responses | |
| 1319 + .iter() | |
| 1320 + .map(|r| { | |
| 1321 + ( | |
| 1322 + r.operator_id, | |
| 1323 + SignedTaskResponse::new(r.task_id, r.task_response.clone(), r.signature.clone(), r.operator_id), | |
| 1324 + ) | |
| 1325 + }) | |
| 1326 + .collect(); | |
| 1327 | |
| 1300 - // Get the canonical TaskResponse from the first response | |
| 1301 - let canonical_task_response = eval_sign_responses[0].task_response.clone(); | |
| 1328 + // Include gateway operator response in direct aggregation (if available) | |
| 1329 + if let Some(gw_response) = &gateway_response { | |
| 1330 + info!( | |
| 1331 + "[centralized] Including gateway operator {} in direct aggregation", | |
| 1332 + gw_response.operator_id() | |
| 1333 + ); | |
| 1334 + signed_responses.push((gw_response.operator_id(), gw_response.clone())); | |
| 1335 + } | |
| 1336 | |
| 1337 + info!( | |
| 1338 + "[centralized] Aggregating {} total signatures (external: {}, gateway: {})", | |
| 1339 + signed_responses.len(), | |
| 1340 + eval_sign_responses.len(), | |
| 1341 + if gateway_response.is_some() { 1 } else { 0 } | |
| 1342 + ); | |
| 1343 + | |
| 1344 + // Get canonical TaskResponse (prefer external operator, fallback to gateway) | |
| 1345 + let canonical_task_response = if !eval_sign_responses.is_empty() { | |
| 1346 + eval_sign_responses[0].task_response.clone() | |
| 1347 + } else if let Some(gw_resp) = &gateway_response { | |
| 1348 + gw_resp.task_response.clone() | |
| 1349 + } else { | |
| 1350 + return Err(GatewayError::Internal( | |
| 1351 + "[centralized] No task response available for canonical reference".to_string(), | |
| 1352 + )); | |
| 1353 + }; | |
| 1354 + | |
| 1355 // DIAGNOSTIC: Log consensus digest for each operator response to detect mismatches | |
| 1304 - for r in &eval_sign_responses { | |
| 1356 + for (operator_id, signed_resp) in &signed_responses { | |
| 1357 let binding: newton_prover_core::newton_prover_task_manager::INewtonProverTaskManager::TaskResponse = | |
| 1306 - r.task_response.clone().into(); | |
| 1358 + signed_resp.task_response.clone().into(); | |
| 1359 let digest = newton_prover_core::common::compute_consensus_digest(&binding); | |
| 1360 info!( | |
| 1361 "[centralized][DEBUG] operator {} response: digest={}, evaluationResult={}", | |
| 1310 - r.operator_id, | |
| 1362 + operator_id, | |
| 1363 digest, | |
| 1312 - newton_prover_core::hex!(&r.task_response.evaluation_result) | |
| 1364 + newton_prover_core::hex!(&signed_resp.task_response.evaluation_result) | |
| 1365 ); | |
| 1366 } | |
| 1367 | |
| 1316 - // Convert ConsensusCommitResponse to SignedTaskResponse for aggregation | |
| 1317 - let signed_responses: Vec<(OperatorId, SignedTaskResponse)> = eval_sign_responses | |
| 1318 - .into_iter() | |
| 1319 - .map(|r| { | |
| 1320 - ( | |
| 1321 - r.operator_id, | |
| 1322 - SignedTaskResponse::new(r.task_id, r.task_response, r.signature, r.operator_id), | |
| 1323 - ) | |
| 1324 - }) | |
| 1325 - .collect(); | |
| 1326 - | |
| 1368 let aggregation_start = Instant::now(); | |
| 1369 // Use source_chain_block for aggregation - operators are registered on source chain | |
| 1370 // and getCheckSignaturesIndices queries operator state at this block | |
| ⏺ Now let me build to verify the changes compile correctly: | |
| ⏺ Bash(cargo build -p newton-prover-gateway 2>&1 | head -100) | |
| ⎿ Blocking waiting for file lock on artifact directory | |
| Compiling newton-prover-core v0.1.3 (/Users/denniswon/projects/newton-prover-avs/crates/core) | |
| Compiling newton-prover-metrics v0.1.3 (/Users/denniswon/projects/newton-prover-avs/crates/metrics) | |
| … +6 lines (ctrl+o to expand) | |
| ⎿ (timeout 3m) | |
| ⏺ Excellent! The build succeeded. Let me run the gateway tests to verify: | |
| ⏺ Bash(cargo test -p newton-prover-gateway 2>&1 | head -80) | |
| ⎿ Compiling newton-prover-core v0.1.3 (/Users/denniswon/projects/newton-prover-avs/crates/core) | |
| Compiling newton-prover-metrics v0.1.3 (/Users/denniswon/projects/newton-prover-avs/crates/metrics) | |
| Compiling newton-prover-chainio v0.1.3 (/Users/denniswon/projects/newton-prover-avs/crates/chainio) | |
| … +77 lines (ctrl+o to expand) | |
| ⎿ (timeout 3m) | |
| ⏺ All 22 gateway tests passed. The fix is complete. Let me provide a summary: | |
| ★ Insight ───────────────────────────────────── | |
| The Fix: The BN254 EC operation failure was caused by two separate BLS aggregations being created: | |
| 1. External operators → aggregate_responses() → DIRECT aggregation → APK_G2 = Op1+Op2 | |
| 2. Gateway operator → submit_with_consensus() → CHANNEL-based aggregation → APK_G2 = Gateway-only | |
| Solution: Added submit_with_consensus_and_wait() that: | |
| - Sets skip_aggregator_submission: true to bypass the channel-based aggregator | |
| - Returns the signed response directly for inclusion in the DIRECT aggregation path | |
| - Now ALL operators (gateway + external) aggregate together, producing a single unified APK_G2 | |
| ───────────────────────────────────────────────── | |
| Summary of Changes | |
| 1. crates/gateway/src/rpc/api/worker.rs | |
| - Added skip_aggregator_submission: bool field to GatewayOperatorTask::Commit enum | |
| - Added submit_with_consensus_and_wait() method that returns the response directly | |
| - Modified process_commit_task() to conditionally skip aggregator submission | |
| 2. crates/gateway/src/task/aggregator.rs | |
| - Added NoOpAggregator struct that implements AggregatorService as a no-op placeholder | |
| 3. crates/gateway/src/rpc/api/sync.rs | |
| - Rewrote broadcast_centralized_and_aggregate() to: | |
| - Run gateway operator and external operators in parallel using tokio::join! | |
| - Include gateway operator's response in the signed_responses vector | |
| - Aggregate ALL operators together in a single aggregate_responses() call | |
| The fix ensures that when using centralized mode with gateway-as-operator, all BLS signatures (gateway + external operators) are | |
| aggregated together into a single AggregatedOperators entry, producing a consistent APK_G2 for successful BN254 pairing verification. | |
| ✻ Brewed for 12m 34s | |
| ❯ commit the changes and push to remote. NEVER add "created by claude" and the commiter/author should always be just @denniswon | |
| me. | |
| ⏺ Bash(git status) | |
| ⎿ On branch bls-block-time | |
| Your branch is up to date with 'origin/bls-block-time'. | |
| … +5 lines (ctrl+o to expand) | |
| ⏺ Bash(git diff --stat) | |
| ⎿ (No content) | |
| ⏺ Bash(git log --oneline -5) | |
| ⎿ 2d0601f5 chore: diagnostic logging to help identify the root cause of the BN254 EC Operation Failure | |
| 7c384128 refactor: optimize BLS operator aggregation logic and error handling | |
| 4c52b7f0 feat: upgrade BLS coordinate logging to INFO level for Datadog CSV visibility | |
| … +2 lines (ctrl+o to expand) | |
| ⏺ Bash(git commit -m "fix: unify BLS aggregation paths to resolve BN254 EC operation failure…) | |
| ⎿ [bls-block-time 264bca10] fix: unify BLS aggregation paths to resolve BN254 EC operation failure | |
| 3 files changed, 262 insertions(+), 62 deletions(-) | |
| ⏺ Bash(git push origin bls-block-time) | |
| ⎿ remote: | |
| remote: GitHub found 22 vulnerabilities on newt-foundation/newton-prover-avs's default branch (3 high, 8 moderate, 11 low). To | |
| find out more, visit: | |
| … +4 lines (ctrl+o to expand) | |
| ⏺ Done! The changes have been committed and pushed: | |
| 264bca10 fix: unify BLS aggregation paths to resolve BN254 EC operation failure | |
| Files changed: | |
| - crates/gateway/src/rpc/api/sync.rs - Parallel execution of gateway + external operators with unified aggregation | |
| - crates/gateway/src/rpc/api/worker.rs - Added skip_aggregator_submission flag and submit_with_consensus_and_wait() method | |
| - crates/gateway/src/task/aggregator.rs - Added NoOpAggregator struct |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment