Skip to content

Instantly share code, notes, and snippets.

@denniswon
Created February 3, 2026 15:50
Show Gist options
  • Select an option

  • Save denniswon/d3ac0347414e079a39c1b4897f9811eb to your computer and use it in GitHub Desktop.

Select an option

Save denniswon/d3ac0347414e079a39c1b4897f9811eb to your computer and use it in GitHub Desktop.
⏺ Based on the investigation summary, I understand the root cause: in centralized mode, the Gateway operator goes through a
CHANNEL-based path while external operators go through a DIRECT aggregation path. This creates two separate aggregations instead of
one unified aggregation.
Let me examine the current flow in sync.rs and implement the fix:
⏺ Read 1 file (ctrl+o to expand)
⎿  Loaded crates/CLAUDE.md
⏺ I can see the issue clearly now. Let me trace through the code:
1. Line 1226-1254: Gateway operator uses worker.submit_with_consensus() - this is fire-and-forget to the channel-based path
2. Line 1261-1287: External operators are broadcast via broadcast_evaluate_and_sign_all()
3. Line 1317-1325: The signed_responses vector is built from ONLY external operators
4. Line 1330-1333: aggregate_responses() is called with ONLY external operator signatures
The Gateway operator's response is never included in the direct aggregation! Let me check the worker to see what synchronous methods
are available:
⏺ Read 1 file (ctrl+o to expand)
⏺ I see there's a response_tx: Option<oneshot::Sender<Option<SignedTaskResponse>>> in the task enum - this suggests there's a way to
get a synchronous response. Let me look at the submission methods:
⏺ Read 1 file (ctrl+o to expand)
⏺ I found the key insight. There's a submit_and_wait method for Prepare mode (lines 239-265) that returns the SignedTaskResponse, but
there's no equivalent for Commit mode. The submit_with_consensus is fire-and-forget (line 217: response_tx: None).
Let me check if there's already a submit_with_consensus_and_wait method:
⏺ Searched for 1 pattern (ctrl+o to expand)
⏺ There's no such method. The fix requires:
1. Add submit_with_consensus_and_wait method to worker.rs that uses the oneshot channel
2. Update sync.rs to use this method and include the Gateway operator's response in aggregation
Let me check how the dispatcher handles Commit tasks and sends responses:
⏺ Read 1 file (ctrl+o to expand)
⏺ Now I fully understand the bug. The issue is in process_commit_task (lines 448-486):
1. It evaluates and signs correctly (line 458)
2. It submits to aggregator via submit_to_aggregator_with_retry (line 469) → CHANNEL-based path → SEPARATE aggregation
3. It returns the response (line 472)
So even if we add submit_with_consensus_and_wait, the process_commit_task function STILL submits to the aggregator, creating a
separate aggregation entry!
The fix needs a new processing method that skips the aggregator submission when we want to include the response in direct
aggregation. Let me implement the full fix:
⏺ Read 1 file (ctrl+o to expand)
⏺ Now I have the full picture. I'll implement the fix in two parts:
★ Insight ─────────────────────────────────────
Root Cause: The Gateway operator's response goes through CHANNEL-based aggregation (process_signed_response) which creates a SEPARATE
AggregatedOperators entry, while external operators go through DIRECT aggregation (aggregate_responses). These two paths never
merge.
Solution: Add a synchronous method that returns the Gateway operator's response WITHOUT submitting to the channel-based aggregator,
then include it in the direct aggregation path.
─────────────────────────────────────────────────
Let me implement the fix:
⏺ Update(crates/gateway/src/rpc/api/worker.rs)
⎿  Added 2 lines
61 aggregator: Arc<dyn AggregatorService>,
62 response_tx: Option<oneshot::Sender<Option<SignedTaskResponse>>>,
63 context: String,
64 + /// If true, skip submitting to aggregator (for direct aggregation path)
65 + skip_aggregator_submission: bool,
66 },
67 }
68
⏺ Update(crates/gateway/src/rpc/api/worker.rs)
⎿  Added 44 lines
218 aggregator,
219 response_tx: None,
220 context: context.to_string(),
221 + skip_aggregator_submission: false,
222 };
223
224 match tokio::time::timeout(self.submit_timeout, self.tx.send(gateway_task)).await {
...
228 }
229 }
230
231 + /// Submits a commit task and waits for the response synchronously.
232 + ///
233 + /// This method is designed for centralized mode where the gateway operator's
234 + /// response needs to be included in the DIRECT aggregation path (via
235 + /// `aggregate_responses`) rather than the channel-based path. This ensures
236 + /// all operators (gateway + external) are aggregated together, producing a
237 + /// single unified APK_G2 for BLS signature verification.
238 + ///
239 + /// # Arguments
240 + ///
241 + /// * `request` - The ConsensusCommitRequest with consensus policyTaskData
242 + /// * `context` - Context string for logging
243 + ///
244 + /// # Returns
245 + ///
246 + /// The signed task response if successful, or `None` if evaluation failed.
247 + pub async fn submit_with_consensus_and_wait(
248 + &self,
249 + request: ConsensusCommitRequest,
250 + context: &str,
251 + ) -> Result<Option<SignedTaskResponse>, WorkerError> {
252 + let (response_tx, response_rx) = oneshot::channel();
253 +
254 + // Note: aggregator is not used when skip_aggregator_submission is true,
255 + // but we need a placeholder. The response will be returned directly
256 + // for inclusion in direct aggregation.
257 + let gateway_task = GatewayOperatorTask::Commit {
258 + request,
259 + aggregator: Arc::new(crate::task::aggregator::NoOpAggregator),
260 + response_tx: Some(response_tx),
261 + context: context.to_string(),
262 + skip_aggregator_submission: true,
263 + };
264 +
265 + match tokio::time::timeout(self.submit_timeout, self.tx.send(gateway_task)).await {
266 + Ok(Ok(())) => {}
267 + Ok(Err(_)) => return Err(WorkerError::WorkerStopped),
268 + Err(_) => return Err(WorkerError::QueueTimeout(self.submit_timeout.as_millis() as u64)),
269 + }
270 +
271 + response_rx.await.map_err(|_| WorkerError::ResponseDropped)
272 + }
273 +
274 /// Submits a task and waits for the response synchronously (prepare mode).
275 ///
276 /// # Arguments
⎿  Found 2 new diagnostic issues in 1 file (ctrl+o to expand)
⏺ Now I need to:
1. Add NoOpAggregator to the aggregator module
2. Fix the dispatcher to handle the new field
3. Update process_commit_task to conditionally skip aggregator submission
Let me check the aggregator trait first:
⏺ Read 1 file (ctrl+o to expand)
⎿  Found 1 new diagnostic issue in 1 file (ctrl+o to expand)
⏺ Let me read more of the aggregator file to see where to add NoOpAggregator:
⏺ Read 1 file (ctrl+o to expand)
⎿  Found 5 new diagnostic issues in 1 file (ctrl+o to expand)
⏺ Let me find the end of the aggregator file to add NoOpAggregator:
⏺ Read 1 file (ctrl+o to expand)
⏺ Now I'll add the NoOpAggregator struct before the tests:
⏺ Update(crates/gateway/src/task/aggregator.rs)
⎿  Added 91 lines
632 }
633 }
634
635 +/// No-op aggregator for use when aggregator submission should be skipped.
636 +///
637 +/// This is used by `GatewayOperatorWorker::submit_with_consensus_and_wait` when
638 +/// the gateway operator's response should be returned directly for inclusion in
639 +/// direct aggregation, rather than being sent to the channel-based aggregator.
640 +pub struct NoOpAggregator;
641 +
642 +#[async_trait]
643 +impl AggregatorService for NoOpAggregator {
644 + async fn initialize_task(
645 + &self,
646 + _task_id: TaskId,
647 + _task_created_block: u64,
648 + _quorum_nums: Vec<u8>,
649 + _quorum_threshold_percentage: u8,
650 + _time_to_expiry: Duration,
651 + _broadcast_count: usize,
652 + ) -> Result<()> {
653 + Ok(())
654 + }
655 +
656 + async fn process_signed_response(&self, _signed_response: SignedTaskResponse) -> Result<()> {
657 + // No-op: skip aggregator submission
658 + Ok(())
659 + }
660 +
661 + async fn process_operator_error(
662 + &self,
663 + _error_response: newton_prover_aggregator::rpc_server::OperatorErrorResponse,
664 + ) -> Result<()> {
665 + Ok(())
666 + }
667 +
668 + async fn wait_for_aggregation(
669 + &self,
670 + _task_id: TaskId,
671 + _timeout_duration: Duration,
672 + ) -> Result<BlsAggregationServiceResponse> {
673 + Err(GatewayError::AggregationFailed(
674 + "NoOpAggregator does not support wait_for_aggregation".to_string(),
675 + ))
676 + }
677 +
678 + async fn submit_aggregated_response(
679 + &self,
680 + _task_request: TaskRequest,
681 + _service_response: BlsAggregationServiceResponse,
682 + ) -> Result<(TaskResponse, TransactionReceipt)> {
683 + Err(GatewayError::AggregationFailed(
684 + "NoOpAggregator does not support submit_aggregated_response".to_string(),
685 + ))
686 + }
687 +
688 + async fn aggregate_responses(
689 + &self,
690 + _task_id: TaskId,
691 + _task_created_block: u64,
692 + _signed_responses: &[(OperatorId, SignedTaskResponse)],
693 + _quorum_numbers: &[u8],
694 + ) -> Result<BlsAggregationServiceResponse> {
695 + Err(GatewayError::AggregationFailed(
696 + "NoOpAggregator does not support aggregate_responses".to_string(),
697 + ))
698 + }
699 +
700 + async fn store_task_response(
701 + &self,
702 + _task_id: TaskId,
703 + _task_response_digest: TaskResponseDigest,
704 + _task_response: BindingTaskResponse,
705 + ) {
706 + // No-op
707 + }
708 +
709 + fn get_task_response(&self, _task_id: TaskId, _task_response_digest: TaskResponseDigest) -> Option<TaskResponse>
+{
710 + None
711 + }
712 +
713 + async fn update_response_indices(
714 + &self,
715 + _task_id: TaskId,
716 + _service_response: BlsAggregationServiceResponse,
717 + _new_task_created_block: u64,
718 + _quorum_numbers: &[u8],
719 + ) -> Result<BlsAggregationServiceResponse> {
720 + Err(GatewayError::AggregationFailed(
721 + "NoOpAggregator does not support update_response_indices".to_string(),
722 + ))
723 + }
724 +}
725 +
726 #[cfg(test)]
727 mod tests {
728 use super::*;
⏺ Now let me fix the dispatcher in worker.rs to handle the skip_aggregator_submission field:
⏺ Update(crates/gateway/src/rpc/api/worker.rs)
⎿  Added 2 lines
401 aggregator,
402 response_tx,
403 context,
404 + skip_aggregator_submission,
405 } => {
406 // Commit mode: use pre-provided consensus policyTaskData
407 let task_id = request.task_id;
...
412 &operator_core,
413 &operator_id,
414 &retry_config,
415 + skip_aggregator_submission,
416 ))
417 .catch_unwind()
418 .await;
⏺ Now let me update process_commit_task to accept and use the skip_aggregator_submission flag:
⏺ Update(crates/gateway/src/rpc/api/worker.rs)
⎿  Added 19 lines, removed 3 lines
493 ///
494 /// This ensures the gateway operator signs the same digest as external operators,
495 /// enabling successful BLS signature aggregation.
496 + ///
497 + /// # Arguments
498 + ///
499 + /// * `request` - The consensus commit request with pre-provided policyTaskData
500 + /// * `context` - Context string for logging
501 + /// * `aggregator` - The aggregator service (may be NoOpAggregator if skipping)
502 + /// * `operator_core` - The operator core for evaluation and signing
503 + /// * `operator_id` - The operator ID for logging
504 + /// * `retry_config` - Retry configuration for aggregator submission
505 + /// * `skip_aggregator_submission` - If true, skip submitting to the channel-based
506 + /// aggregator. This is used when the response will be included in direct
507 + /// aggregation (via `aggregate_responses`) instead.
508 async fn process_commit_task(
509 request: ConsensusCommitRequest,
510 context: &str,
...
512 operator_core: &Arc<OperatorCore>,
513 operator_id: &str,
514 retry_config: &RetryConfig,
515 + skip_aggregator_submission: bool,
516 ) -> Option<SignedTaskResponse> {
517 let task_id = request.task_id;
518
...
523 context = %context,
524 operator_id = %operator_id,
525 mode = "commit",
526 + skip_aggregator = skip_aggregator_submission,
527 "gateway operator task evaluated successfully with consensus data"
528 );
529
516 - // Submit to aggregator with retry logic
517 - Self::submit_to_aggregator_with_retry(&aggregator, signed_response.clone(), task_id, retry_config)
518 - .await;
530 + // Only submit to aggregator if not skipping (direct aggregation path)
531 + if !skip_aggregator_submission {
532 + Self::submit_to_aggregator_with_retry(&aggregator, signed_response.clone(), task_id, retry_config
+)
533 + .await;
534 + }
535
536 Some(signed_response)
537 }
⎿  Found 5 new diagnostic issues in 1 file (ctrl+o to expand)
⏺ I see - there's also a call to process_prepare_task that's now showing an error. Let me check which call is incorrect:
⏺ Read 1 file (ctrl+o to expand)
⏺ Let me verify the function definition is correct:
⏺ Read 1 file (ctrl+o to expand)
⎿  Found 1 new diagnostic issue in 1 file (ctrl+o to expand)
⏺ The function definition looks correct. The diagnostic might be stale. Let me also add Debug derive to NoOpAggregator:
⏺ Update(crates/gateway/src/task/aggregator.rs)
⎿  Added 1 line
637 /// This is used by `GatewayOperatorWorker::submit_with_consensus_and_wait` when
638 /// the gateway operator's response should be returned directly for inclusion in
639 /// direct aggregation, rather than being sent to the channel-based aggregator.
640 +#[derive(Debug)]
641 pub struct NoOpAggregator;
642
643 #[async_trait]
⏺ Now let me update sync.rs to use the new method and include the Gateway operator's response in direct aggregation:
⏺ Read 1 file (ctrl+o to expand)
⏺ Now I need to rewrite this section to:
1. Run Gateway operator evaluation in parallel with external operator broadcast
2. Include Gateway operator's response in the signed_responses vector
⏺ Update(crates/gateway/src/rpc/api/sync.rs)
⎿  Added 100 lines, removed 59 lines
1220 wasm_args: task_request.wasm_args.clone().unwrap_or_default(),
1221 };
1222
1223 - // Submit gateway operator with the SAME centralized consensus data (fire-and-forget)
1224 - // This ensures the gateway operator signs the same digest as external operators,
1225 - // which is critical for BLS signature aggregation to succeed.
1226 - if let Some(worker) = &self.operator_worker {
1227 - if let Err(e) = worker
1228 - .submit_with_consensus(
1229 - consensus_commit_request.clone(),
1230 - self.aggregator.clone(),
1231 - "sync-centralized",
1232 - )
1233 - .await
1234 - {
1235 - match &e {
1236 - crate::rpc::api::worker::WorkerError::QueueTimeout(ms) => {
1237 - warn!(
1223 + let phase2_request = build_evaluate_and_sign_request(task_id, &consensus_commit_request);
1224 +
1225 + // ═══════════════════════════════════════════════════════════════════════════
1226 + // PARALLEL EXECUTION: Gateway operator + external operators
1227 + // ═══════════════════════════════════════════════════════════════════════════
1228 + // Run gateway operator evaluation in parallel with external operator broadcast.
1229 + // This ensures the gateway operator's response is included in the DIRECT
1230 + // aggregation path (via aggregate_responses) rather than creating a separate
1231 + // aggregation through the channel-based path.
1232 + let phase2_start = Instant::now();
1233 +
1234 + // Gateway operator task (returns response for direct aggregation, skips channel-based aggregator)
1235 + let gateway_task = async {
1236 + if let Some(worker) = &self.operator_worker {
1237 + match worker
1238 + .submit_with_consensus_and_wait(consensus_commit_request.clone(), "sync-centralized")
1239 + .await
1240 + {
1241 + Ok(Some(response)) => {
1242 + info!(
1243 task_id = %task_id,
1239 - timeout_ms = %ms,
1244 + operator_id = %response.operator_id(),
1245 mode = "centralized",
1241 - "gateway operator queue full, task not submitted"
1246 + "gateway operator response received for direct aggregation"
1247 );
1248 + Some(response)
1249 }
1244 - _ => {
1245 - warn!(task_id = %task_id, error = %e, mode = "centralized", "failed to submit to gateway ope
-rator worker");
1250 + Ok(None) => {
1251 + warn!(task_id = %task_id, mode = "centralized", "gateway operator evaluation returned None")
+;
1252 + None
1253 }
1254 + Err(e) => {
1255 + match &e {
1256 + crate::rpc::api::worker::WorkerError::QueueTimeout(ms) => {
1257 + warn!(
1258 + task_id = %task_id,
1259 + timeout_ms = %ms,
1260 + mode = "centralized",
1261 + "gateway operator queue full, task not submitted"
1262 + );
1263 + }
1264 + _ => {
1265 + warn!(task_id = %task_id, error = %e, mode = "centralized", "failed to get gateway o
+perator response");
1266 + }
1267 + }
1268 + None
1269 + }
1270 }
1271 } else {
1249 - info!(
1250 - task_id = %task_id,
1251 - mode = "centralized",
1252 - "gateway operator submitted with centralized consensus data"
1253 - );
1272 + None
1273 }
1255 - }
1274 + };
1275
1257 - let phase2_request = build_evaluate_and_sign_request(task_id, &consensus_commit_request);
1258 -
1259 - // Broadcast to ALL external operators (no Prepare phase filtering)
1260 - let phase2_start = Instant::now();
1261 - let eval_sign_responses = match self
1276 + // External operators task
1277 + let external_task = self
1278 .operator_pool
1263 - .broadcast_evaluate_and_sign_all(&phase2_request, quorum_numbers.to_vec(), quorum_threshold, timeout)
1264 - .await
1265 - {
1279 + .broadcast_evaluate_and_sign_all(&phase2_request, quorum_numbers.to_vec(), quorum_threshold, timeout);
1280 +
1281 + // Run both tasks in parallel
1282 + let (gateway_response, external_result) = tokio::join!(gateway_task, external_task);
1283 +
1284 + let phase2_duration = phase2_start.elapsed();
1285 + newton_prover_metrics::record_centralized_evaluate_duration(phase2_duration.as_secs_f64());
1286 +
1287 + let eval_sign_responses = match external_result {
1288 Ok(responses) => {
1267 - let phase2_duration = phase2_start.elapsed();
1268 - newton_prover_metrics::record_centralized_evaluate_duration(phase2_duration.as_secs_f64());
1289 newton_prover_metrics::set_centralized_response_count(responses.len());
1290 info!(
1271 - "[centralized] Collected {} evaluate+sign responses in {} ms",
1291 + "[centralized] Collected {} external operator responses in {} ms",
1292 responses.len(),
1293 phase2_duration.as_millis()
1294 );
1295 responses
1296 }
1297 Err(e) => {
1278 - let phase2_duration = phase2_start.elapsed();
1279 - newton_prover_metrics::record_centralized_evaluate_duration(phase2_duration.as_secs_f64());
1298 error!(
1299 "[centralized] Broadcast failed after {} ms: {}",
1300 phase2_duration.as_millis(),
...
1304 }
1305 };
1306
1289 - if eval_sign_responses.is_empty() {
1307 + // Check we have at least one response (gateway or external)
1308 + if eval_sign_responses.is_empty() && gateway_response.is_none() {
1309 return Err(GatewayError::Internal(
1291 - "[centralized] No responses received from operators".to_string(),
1310 + "[centralized] No responses received from any operators".to_string(),
1311 ));
1312 }
1313
1314 // ═══════════════════════════════════════════════════════════════════════════
1296 - // AGGREGATE: All operators evaluated with SAME gateway-provided data
1315 + // AGGREGATE: All operators (gateway + external) in SINGLE aggregation
1316 // ═══════════════════════════════════════════════════════════════════════════
1298 - info!("[centralized] Aggregating {} signatures", eval_sign_responses.len());
1317 + // Convert ConsensusCommitResponse to SignedTaskResponse for aggregation
1318 + let mut signed_responses: Vec<(OperatorId, SignedTaskResponse)> = eval_sign_responses
1319 + .iter()
1320 + .map(|r| {
1321 + (
1322 + r.operator_id,
1323 + SignedTaskResponse::new(r.task_id, r.task_response.clone(), r.signature.clone(), r.operator_id),
1324 + )
1325 + })
1326 + .collect();
1327
1300 - // Get the canonical TaskResponse from the first response
1301 - let canonical_task_response = eval_sign_responses[0].task_response.clone();
1328 + // Include gateway operator response in direct aggregation (if available)
1329 + if let Some(gw_response) = &gateway_response {
1330 + info!(
1331 + "[centralized] Including gateway operator {} in direct aggregation",
1332 + gw_response.operator_id()
1333 + );
1334 + signed_responses.push((gw_response.operator_id(), gw_response.clone()));
1335 + }
1336
1337 + info!(
1338 + "[centralized] Aggregating {} total signatures (external: {}, gateway: {})",
1339 + signed_responses.len(),
1340 + eval_sign_responses.len(),
1341 + if gateway_response.is_some() { 1 } else { 0 }
1342 + );
1343 +
1344 + // Get canonical TaskResponse (prefer external operator, fallback to gateway)
1345 + let canonical_task_response = if !eval_sign_responses.is_empty() {
1346 + eval_sign_responses[0].task_response.clone()
1347 + } else if let Some(gw_resp) = &gateway_response {
1348 + gw_resp.task_response.clone()
1349 + } else {
1350 + return Err(GatewayError::Internal(
1351 + "[centralized] No task response available for canonical reference".to_string(),
1352 + ));
1353 + };
1354 +
1355 // DIAGNOSTIC: Log consensus digest for each operator response to detect mismatches
1304 - for r in &eval_sign_responses {
1356 + for (operator_id, signed_resp) in &signed_responses {
1357 let binding: newton_prover_core::newton_prover_task_manager::INewtonProverTaskManager::TaskResponse =
1306 - r.task_response.clone().into();
1358 + signed_resp.task_response.clone().into();
1359 let digest = newton_prover_core::common::compute_consensus_digest(&binding);
1360 info!(
1361 "[centralized][DEBUG] operator {} response: digest={}, evaluationResult={}",
1310 - r.operator_id,
1362 + operator_id,
1363 digest,
1312 - newton_prover_core::hex!(&r.task_response.evaluation_result)
1364 + newton_prover_core::hex!(&signed_resp.task_response.evaluation_result)
1365 );
1366 }
1367
1316 - // Convert ConsensusCommitResponse to SignedTaskResponse for aggregation
1317 - let signed_responses: Vec<(OperatorId, SignedTaskResponse)> = eval_sign_responses
1318 - .into_iter()
1319 - .map(|r| {
1320 - (
1321 - r.operator_id,
1322 - SignedTaskResponse::new(r.task_id, r.task_response, r.signature, r.operator_id),
1323 - )
1324 - })
1325 - .collect();
1326 -
1368 let aggregation_start = Instant::now();
1369 // Use source_chain_block for aggregation - operators are registered on source chain
1370 // and getCheckSignaturesIndices queries operator state at this block
⏺ Now let me build to verify the changes compile correctly:
⏺ Bash(cargo build -p newton-prover-gateway 2>&1 | head -100)
⎿   Blocking waiting for file lock on artifact directory
Compiling newton-prover-core v0.1.3 (/Users/denniswon/projects/newton-prover-avs/crates/core)
Compiling newton-prover-metrics v0.1.3 (/Users/denniswon/projects/newton-prover-avs/crates/metrics)
… +6 lines (ctrl+o to expand)
⎿  (timeout 3m)
⏺ Excellent! The build succeeded. Let me run the gateway tests to verify:
⏺ Bash(cargo test -p newton-prover-gateway 2>&1 | head -80)
⎿   Compiling newton-prover-core v0.1.3 (/Users/denniswon/projects/newton-prover-avs/crates/core)
Compiling newton-prover-metrics v0.1.3 (/Users/denniswon/projects/newton-prover-avs/crates/metrics)
Compiling newton-prover-chainio v0.1.3 (/Users/denniswon/projects/newton-prover-avs/crates/chainio)
… +77 lines (ctrl+o to expand)
⎿  (timeout 3m)
⏺ All 22 gateway tests passed. The fix is complete. Let me provide a summary:
★ Insight ─────────────────────────────────────
The Fix: The BN254 EC operation failure was caused by two separate BLS aggregations being created:
1. External operators → aggregate_responses() → DIRECT aggregation → APK_G2 = Op1+Op2
2. Gateway operator → submit_with_consensus() → CHANNEL-based aggregation → APK_G2 = Gateway-only
Solution: Added submit_with_consensus_and_wait() that:
- Sets skip_aggregator_submission: true to bypass the channel-based aggregator
- Returns the signed response directly for inclusion in the DIRECT aggregation path
- Now ALL operators (gateway + external) aggregate together, producing a single unified APK_G2
─────────────────────────────────────────────────
Summary of Changes
1. crates/gateway/src/rpc/api/worker.rs
- Added skip_aggregator_submission: bool field to GatewayOperatorTask::Commit enum
- Added submit_with_consensus_and_wait() method that returns the response directly
- Modified process_commit_task() to conditionally skip aggregator submission
2. crates/gateway/src/task/aggregator.rs
- Added NoOpAggregator struct that implements AggregatorService as a no-op placeholder
3. crates/gateway/src/rpc/api/sync.rs
- Rewrote broadcast_centralized_and_aggregate() to:
- Run gateway operator and external operators in parallel using tokio::join!
- Include gateway operator's response in the signed_responses vector
- Aggregate ALL operators together in a single aggregate_responses() call
The fix ensures that when using centralized mode with gateway-as-operator, all BLS signatures (gateway + external operators) are
aggregated together into a single AggregatedOperators entry, producing a consistent APK_G2 for successful BN254 pairing verification.
✻ Brewed for 12m 34s
❯ commit the changes and push to remote. NEVER add "created by claude" and the commiter/author should always be just @denniswon
me.
⏺ Bash(git status)
⎿  On branch bls-block-time
Your branch is up to date with 'origin/bls-block-time'.
… +5 lines (ctrl+o to expand)
⏺ Bash(git diff --stat)
⎿  (No content)
⏺ Bash(git log --oneline -5)
⎿  2d0601f5 chore: diagnostic logging to help identify the root cause of the BN254 EC Operation Failure
7c384128 refactor: optimize BLS operator aggregation logic and error handling
4c52b7f0 feat: upgrade BLS coordinate logging to INFO level for Datadog CSV visibility
… +2 lines (ctrl+o to expand)
⏺ Bash(git commit -m "fix: unify BLS aggregation paths to resolve BN254 EC operation failure…)
⎿  [bls-block-time 264bca10] fix: unify BLS aggregation paths to resolve BN254 EC operation failure
3 files changed, 262 insertions(+), 62 deletions(-)
⏺ Bash(git push origin bls-block-time)
⎿ remote:
remote: GitHub found 22 vulnerabilities on newt-foundation/newton-prover-avs's default branch (3 high, 8 moderate, 11 low). To
find out more, visit:
… +4 lines (ctrl+o to expand)
⏺ Done! The changes have been committed and pushed:
264bca10 fix: unify BLS aggregation paths to resolve BN254 EC operation failure
Files changed:
- crates/gateway/src/rpc/api/sync.rs - Parallel execution of gateway + external operators with unified aggregation
- crates/gateway/src/rpc/api/worker.rs - Added skip_aggregator_submission flag and submit_with_consensus_and_wait() method
- crates/gateway/src/task/aggregator.rs - Added NoOpAggregator struct
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment