Skip to content

Instantly share code, notes, and snippets.

@BYJRK
Created August 11, 2024 07:42
Show Gist options
  • Select an option

  • Save BYJRK/b1b893bb5660cea32326025f49116609 to your computer and use it in GitHub Desktop.

Select an option

Save BYJRK/b1b893bb5660cea32326025f49116609 to your computer and use it in GitHub Desktop.
AsyncBarrier copied from Microsoft.VisualStudio.Threading
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class AsyncBarrier
{
/// <summary>
/// The number of participants being synchronized.
/// </summary>
private readonly int participantCount;
/// <summary>
/// The set of participants who have reached the barrier, with their awaiters that can resume those participants.
/// </summary>
private readonly Stack<Waiter> waiters;
/// <summary>
/// Initializes a new instance of the <see cref="AsyncBarrier"/> class.
/// </summary>
/// <param name="participants">The number of participants.</param>
public AsyncBarrier(int participants)
{
if (participants <= 0)
throw new ArgumentOutOfRangeException(
nameof(participants),
$"Argument {nameof(participants)} must be a positive number."
);
this.participantCount = participants;
// Allocate the stack so no resizing is necessary.
// We don't need space for the last participant, since we never have to store it.
this.waiters = new Stack<Waiter>(participants - 1);
}
/// <inheritdoc cref="SignalAndWait(CancellationToken)" />
public Task SignalAndWait() => this.SignalAndWait(CancellationToken.None).AsTask();
private object SignalAndWait(object none)
{
throw new NotImplementedException();
}
/// <summary>
/// Signals that a participant is ready, and returns a Task
/// that completes when all other participants have also signaled ready.
/// </summary>
/// <param name="cancellationToken">
/// A token that signals the caller's lost interest in waiting.
/// The signal effect of the method is not canceled with the token.
/// </param>
/// <returns>A task which will complete (or may already be completed) when the last participant calls this method.</returns>
public ValueTask SignalAndWait(CancellationToken cancellationToken)
{
lock (this.waiters)
{
if (this.waiters.Count + 1 == this.participantCount)
{
// This is the last one we were waiting for.
// Unleash everyone that preceded this one.
while (this.waiters.Count > 0)
{
Waiter waiter = this.waiters.Pop();
waiter.CompletionSource.TrySetResult(default);
waiter.CancellationRegistration.Dispose();
}
// And allow this one to continue immediately.
return new ValueTask(
cancellationToken.IsCancellationRequested
? Task.FromCanceled(cancellationToken)
: Task.CompletedTask
);
}
else
{
// We need more folks. So suspend this caller.
TaskCompletionSource<EmptyStruct> tcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);
CancellationTokenRegistration ctr;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(
static (tcs, ct) =>
((TaskCompletionSource<EmptyStruct>)tcs!).TrySetCanceled(ct),
tcs
);
}
else
{
ctr = default;
}
this.waiters.Push(new Waiter(tcs, ctr));
return new ValueTask(tcs.Task);
}
}
}
private readonly struct Waiter(
TaskCompletionSource<EmptyStruct> completionSource,
CancellationTokenRegistration cancellationRegistration
)
{
internal readonly TaskCompletionSource<EmptyStruct> CompletionSource => completionSource;
internal readonly CancellationTokenRegistration CancellationRegistration =>
cancellationRegistration;
}
}
internal readonly struct EmptyStruct
{
/// <summary>
/// Gets an instance of the empty struct.
/// </summary>
internal static EmptyStruct Instance => default;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment