Skip to content

Instantly share code, notes, and snippets.

@pbk20191
Last active February 9, 2026 00:20
Show Gist options
  • Select an option

  • Save pbk20191/84e1164b4e21fc994ac622a8172616ff to your computer and use it in GitHub Desktop.

Select an option

Save pbk20191/84e1164b4e21fc994ac622a8172616ff to your computer and use it in GitHub Desktop.
CFRunLoop derived DispatchSerialQueue
//
// RunLoopGCDExecutor.swift
// SampleSample
//
// Created by 박병관 on 1/13/26.
//
import Foundation
import Dispatch
import _Concurrency
final class _RunLoopGCDExecutor {
nonisolated(unsafe)
private let runLoop = RunLoop.current
private let queue:DispatchSerialQueue = RunLoop.current.getCFRunLoop().peekDefaultRunLoopQueue()
private let wrappedQueue:DispatchSerialQueue = DispatchSerialQueue(label: "D", target: RunLoop.current.getCFRunLoop().peekDefaultRunLoopQueue())
public func stop() {
CFRunLoopStop(runLoop.getCFRunLoop())
}
public func run() throws {
let noTask = withUnsafeCurrentTask(body: { $0 == nil})
guard noTask else {
throw CancellationError()
}
guard runLoop.isEqual(to: RunLoop.current) else {
throw CancellationError()
}
var value = CFRunLoopRunResult.handledSource
let port = NSMachPort.port(withMachPort: LibDispatchNameSpace.symbols.create()._dispatch_runloop_root_queue_get_port_4CF(queue), options: [])
runLoop.add(port, forMode: .default)
defer {
runLoop.remove(port, forMode: .default)
}
while value != .finished && value != .stopped {
value = CFRunLoopRunInMode(.defaultMode, Date.distantFuture.timeIntervalSinceNow, false)
}
}
public func runUntil(_ condition: () -> Bool) throws {
let noTask = withUnsafeCurrentTask(body: { $0 == nil})
guard noTask else {
throw CancellationError()
}
guard runLoop.isEqual(to: RunLoop.current) else {
throw CancellationError()
}
let port = NSMachPort.port(withMachPort: LibDispatchNameSpace.symbols.create()._dispatch_runloop_root_queue_get_port_4CF(queue), options: [])
runLoop.add(port, forMode: .default)
defer {
runLoop.remove(port, forMode: .default)
}
while !condition() {
if !runLoop.run(mode: .default, before: .distantFuture) {
throw CancellationError()
}
}
}
}
extension _RunLoopGCDExecutor:SerialExecutor, TaskExecutor {
func enqueue(_ job: consuming ExecutorJob) {
wrappedQueue.enqueue(job)
}
func isSameExclusiveExecutionContext(other: RunLoopGCDExecutor) -> Bool {
return runLoop.isEqual(other.runLoop)
}
func isIsolatingCurrentContext() -> Bool? {
runLoop.isEqual(RunLoop.current)
}
func checkIsolated() {
precondition(runLoop.isEqual(RunLoop.current))
}
}
fileprivate extension CFRunLoop {
private func peekRunLoopModeSet() -> CFSet {
Unmanaged<CFSet>.fromOpaque(unsafeBitCast(self, to: UnsafeMutableRawPointer.self).advanced(by: 2664).load(as: UnsafeRawPointer?.self)!).takeUnretainedValue()
}
private func peekDefaultRunLoopMode() -> AnyObject {
let set = CFSetCreateCopy(nil, self.peekRunLoopModeSet())
var value:AnyObject? = nil
CFSetApplyFunction(set, { c_value, c_context in
let ptr = c_context?.assumingMemoryBound(to: AnyObject.self)
if ptr?.pointee != nil {
return
}
let modeName = c_value!.advanced(by: 80).load(as: UnsafeRawPointer?.self).flatMap({ Unmanaged<CFString>.fromOpaque($0).takeUnretainedValue() })
if CFEqual(modeName, CFRunLoopMode.defaultMode.rawValue) {
ptr?.pointee = Unmanaged<AnyObject>.fromOpaque(c_value!).takeUnretainedValue()
}
}, &value)
return value!
}
func peekDefaultRunLoopQueue() -> DispatchSerialQueue {
let ref = peekDefaultRunLoopMode()
return Unmanaged.passUnretained(ref).toOpaque().advanced(by: 160).load(as: UnsafeRawPointer?.self).flatMap({
Unmanaged<DispatchSerialQueue>.fromOpaque($0).takeUnretainedValue()
})!
}
// func peekDefault
}
extension _RunLoopGCDExecutor {
enum LibDispatchNameSpace {
var symbols: symbols { .create() }
struct symbols:BitwiseCopyable {
typealias _dispatch_runloop_root_queue_create_4CF_callback = @convention(c) (UnsafePointer<CChar>?, CFOptionFlags) -> Unmanaged<dispatch_queue_serial_t>
typealias _dispatch_runloop_root_queue_perform_4CF_callback = @convention(c) (dispatch_queue_t) -> DarwinBoolean
typealias _dispatch_runloop_root_queue_get_port_4CF_callback = @convention(c) (dispatch_queue_t) -> mach_port_t
typealias dispatch_set_context_callback = @convention(c) (dispatch_object_t, UnsafeMutableRawPointer?) -> Void
typealias dispatch_get_context_callback = @convention(c) (dispatch_object_t) -> UnsafeMutableRawPointer?
typealias dispatch_set_finalizer_f_callback = @convention(c) (dispatch_object_t, CFTreeReleaseCallBack?) -> Void
let _dispatch_runloop_root_queue_create_4CF:_dispatch_runloop_root_queue_create_4CF_callback
let _dispatch_runloop_root_queue_perform_4CF:_dispatch_runloop_root_queue_perform_4CF_callback
let _dispatch_runloop_root_queue_get_port_4CF:_dispatch_runloop_root_queue_get_port_4CF_callback
let dispatch_set_context:dispatch_set_context_callback
let dispatch_get_context:dispatch_get_context_callback
let dispatch_set_finalizer_f:dispatch_set_finalizer_f_callback
private init() {
do {
//_dispatch_continuation_redirect_push
Bundle(for: NSClassFromString("OS_dispatch_queue")!).load()
// NSStringFromClass(<#T##aClass: AnyClass##AnyClass#>)
// let cfBundle = CFBundleCreate(nil, URL(string: "/usr/lib/system/introspection")! as CFURL)
let symbols = [
"_dispatch_runloop_root_queue_create_4CF",
"_dispatch_runloop_root_queue_perform_4CF",
"_dispatch_runloop_root_queue_get_port_4CF",
"dispatch_set_context",
"dispatch_get_context",
"dispatch_set_finalizer_f",
"_dispatch_lane_invoke",
"_dispatch_lane_push"
]
// DispatchSource.makeMachSendSource(port: <#T##mach_port_t#>, eventMask: <#T##DispatchSource.MachSendEvent#>)
var list = ContiguousArray<UnsafeMutableRawPointer?>()
for i in symbols {
i.withCString {
list.append(dlsym(.init(bitPattern: -2)!, $0))
}
}
// list.withUnsafeMutableBufferPointer {
// dlsym(.init(bitPattern: -2)!, <#T##__symbol: UnsafePointer<CChar>!##UnsafePointer<CChar>!#>)
// CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!)
//
// }
_dispatch_runloop_root_queue_create_4CF = unsafeBitCast(list[0], to: _dispatch_runloop_root_queue_create_4CF_callback.self)
_dispatch_runloop_root_queue_perform_4CF = unsafeBitCast(list[1], to: _dispatch_runloop_root_queue_perform_4CF_callback.self)
_dispatch_runloop_root_queue_get_port_4CF = unsafeBitCast(list[2], to: _dispatch_runloop_root_queue_get_port_4CF_callback.self)
dispatch_set_context = unsafeBitCast(list[3], to: dispatch_set_context_callback.self)
dispatch_get_context = unsafeBitCast(list[4], to: dispatch_get_context_callback.self)
dispatch_set_finalizer_f = unsafeBitCast(list[5], to: dispatch_set_finalizer_f_callback.self)
}
}
static func create() -> Self {
autoreleasepool {
.init()
}
}
}
}
}
extension _RunLoopGCDExecutor {
enum CoreFoundationNamespace {
struct symbols:BitwiseCopyable {
let retain:CFTreeRetainCallBack
let release:CFTreeReleaseCallBack
let hash:CFSetHashCallBack
let equal:CFArrayEqualCallBack
let copyDescription:CFBagCopyDescriptionCallBack
let _CFRunLoopGet2:@convention(c) (CFRunLoop) -> RunLoop
// let _CFFireTimer:CFRunLoopTimerCallBack
// let _CFRunLoopSourceWakeUpRunLoops:@convention(c) (CFRunLoopSource) -> Void
private init() {
do {
let cfBundle = CFBundleGetBundleWithIdentifier("com.apple.CoreFoundation" as CFString)!
let symbols = [
"CFRetain",
"CFRelease",
"CFHash",
"CFEqual",
"CFCopyDescription",
"_CFRunLoopGet2",
// "_CFRunLoopSourceWakeUpRunLoops",
// "_CFFireTimer"
]
var list = ContiguousArray<UnsafeMutableRawPointer?>.init(repeating: nil, count: symbols.count)
list.withUnsafeMutableBufferPointer {
CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!)
}
retain = unsafeBitCast(list[0], to: CFTreeRetainCallBack.self)
release = unsafeBitCast(list[1], to: CFTreeReleaseCallBack.self)
hash = unsafeBitCast(list[2], to: CFSetHashCallBack.self)
equal = unsafeBitCast(list[3], to: CFArrayEqualCallBack.self)
copyDescription = unsafeBitCast(list[4], to: CFBagCopyDescriptionCallBack.self)
_CFRunLoopGet2 = unsafeBitCast(list[5], to: (@convention(c) (CFRunLoop) -> RunLoop).self)
// __CFFireTimer = unsafeBitCast(list[6], to: CFRunLoopTimerCallBack.self)
// _CFRunLoopSourceWakeUpRunLoops = unsafeBitCast(list[6], to: (@convention(c) (CFRunLoopSource) -> Void).self)
}
}
static func create() -> Self {
autoreleasepool {
.init()
}
}
}
}
}
import Foundation
import Dispatch
class Receiver: NSObject, NSMachPortDelegate {
weak var ref:DispatchQueue?
weak var runLoop:CFRunLoop?
let perform:@convention(c) (DispatchQueue) -> DarwinBoolean
func handleMachMessage(_ msg: UnsafeMutableRawPointer) {
guard let ref else { return }
while autoreleasepool(invoking: {
perform(ref)
}) == true {}
}
init( perform: @convention(c) (DispatchQueue) -> DarwinBoolean) {
self.perform = perform
}
@objc func run(_ port:NSMachPort) {
autoreleasepool {
self.runLoop = CFRunLoopGetCurrent()
port.setDelegate(self)
RunLoop.current.add(port, forMode: .common)
}
autoreleasepool {
while port.isValid, RunLoop.current.run(mode: .default, before: .distantFuture) {
}
}
}
}
import Combine
enum LibDispatchNameSpace {
var symbols: symbols { .create() }
struct symbols:BitwiseCopyable {
typealias _dispatch_runloop_root_queue_create_4CF_callback = @convention(c) (UnsafePointer<CChar>?, CFOptionFlags) -> Unmanaged<dispatch_queue_serial_t>
typealias _dispatch_runloop_root_queue_perform_4CF_callback = @convention(c) (dispatch_queue_t) -> DarwinBoolean
typealias _dispatch_runloop_root_queue_get_port_4CF_callback = @convention(c) (dispatch_queue_t) -> mach_port_t
typealias dispatch_set_context_callback = @convention(c) (dispatch_object_t, UnsafeMutableRawPointer?) -> Void
typealias dispatch_get_context_callback = @convention(c) (dispatch_object_t) -> UnsafeMutableRawPointer?
typealias dispatch_set_finalizer_f_callback = @convention(c) (dispatch_object_t, CFTreeReleaseCallBack?) -> Void
let _dispatch_runloop_root_queue_create_4CF:_dispatch_runloop_root_queue_create_4CF_callback
let _dispatch_runloop_root_queue_perform_4CF:_dispatch_runloop_root_queue_perform_4CF_callback
let _dispatch_runloop_root_queue_get_port_4CF:_dispatch_runloop_root_queue_get_port_4CF_callback
let dispatch_set_context:dispatch_set_context_callback
let dispatch_get_context:dispatch_get_context_callback
let dispatch_set_finalizer_f:dispatch_set_finalizer_f_callback
private init() {
do {
//_dispatch_continuation_redirect_push
Bundle(for: NSClassFromString("OS_dispatch_queue")!).load()
// NSStringFromClass(<#T##aClass: AnyClass##AnyClass#>)
// let cfBundle = CFBundleCreate(nil, URL(string: "/usr/lib/system/introspection")! as CFURL)
let symbols = [
"_dispatch_runloop_root_queue_create_4CF",
"_dispatch_runloop_root_queue_perform_4CF",
"_dispatch_runloop_root_queue_get_port_4CF",
"dispatch_set_context",
"dispatch_get_context",
"dispatch_set_finalizer_f",
"_dispatch_lane_invoke",
"_dispatch_lane_push"
]
// DispatchSource.makeMachSendSource(port: <#T##mach_port_t#>, eventMask: <#T##DispatchSource.MachSendEvent#>)
var list = ContiguousArray<UnsafeMutableRawPointer?>()
for i in symbols {
i.withCString {
list.append(dlsym(.init(bitPattern: -2)!, $0))
}
}
// list.withUnsafeMutableBufferPointer {
// dlsym(.init(bitPattern: -2)!, <#T##__symbol: UnsafePointer<CChar>!##UnsafePointer<CChar>!#>)
// CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!)
//
// }
_dispatch_runloop_root_queue_create_4CF = unsafeBitCast(list[0], to: _dispatch_runloop_root_queue_create_4CF_callback.self)
_dispatch_runloop_root_queue_perform_4CF = unsafeBitCast(list[1], to: _dispatch_runloop_root_queue_perform_4CF_callback.self)
_dispatch_runloop_root_queue_get_port_4CF = unsafeBitCast(list[2], to: _dispatch_runloop_root_queue_get_port_4CF_callback.self)
dispatch_set_context = unsafeBitCast(list[3], to: dispatch_set_context_callback.self)
dispatch_get_context = unsafeBitCast(list[4], to: dispatch_get_context_callback.self)
dispatch_set_finalizer_f = unsafeBitCast(list[5], to: dispatch_set_finalizer_f_callback.self)
}
}
static func create() -> Self {
autoreleasepool {
.init()
}
}
nonisolated(unsafe)
static let key = DispatchSpecificKey<AnyCancellable>()
func runLoopDispatchQueue() -> DispatchSerialQueue {
let cfSym = CoreFoundationNamespace.symbols.create()
let runLoopQueue = _dispatch_runloop_root_queue_create_4CF(nil, 0).takeRetainedValue()
let mach = _dispatch_runloop_root_queue_get_port_4CF(runLoopQueue)
let cfPort = NSMachPort(machPort: mach, options: [])
let delegate = Receiver(perform: self._dispatch_runloop_root_queue_perform_4CF)
delegate.ref = runLoopQueue
cfPort.setDelegate(delegate)
objc_setAssociatedObject(runLoopQueue, Unmanaged.passUnretained(delegate).toOpaque(), AnyCancellable{
cfPort.invalidate()
delegate.runLoop.flatMap(CFRunLoopStop)
}, .OBJC_ASSOCIATION_RETAIN_NONATOMIC)
let th = Thread(target: delegate, selector: #selector(delegate.run), object: cfPort)
th.name = "looper"
th.start()
return runLoopQueue
}
}
}
//@_silgen_name("CFRunLoopSourceWakeUpRunLoops")
//func _CFRunLoopSourceWakeUpRunLoops(_ source: CFRunLoopSource) -> Void
enum CoreFoundationNamespace {
struct symbols:BitwiseCopyable {
let retain:CFTreeRetainCallBack
let release:CFTreeReleaseCallBack
let hash:CFSetHashCallBack
let equal:CFArrayEqualCallBack
let copyDescription:CFBagCopyDescriptionCallBack
let _CFRunLoopGet2:@convention(c) (CFRunLoop) -> RunLoop
// let _CFFireTimer:CFRunLoopTimerCallBack
// let _CFRunLoopSourceWakeUpRunLoops:@convention(c) (CFRunLoopSource) -> Void
private init() {
do {
let cfBundle = CFBundleGetBundleWithIdentifier("com.apple.CoreFoundation" as CFString)!
let symbols = [
"CFRetain",
"CFRelease",
"CFHash",
"CFEqual",
"CFCopyDescription",
"_CFRunLoopGet2",
// "_CFRunLoopSourceWakeUpRunLoops",
// "_CFFireTimer"
]
var list = ContiguousArray<UnsafeMutableRawPointer?>.init(repeating: nil, count: symbols.count)
list.withUnsafeMutableBufferPointer {
CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!)
}
retain = unsafeBitCast(list[0], to: CFTreeRetainCallBack.self)
release = unsafeBitCast(list[1], to: CFTreeReleaseCallBack.self)
hash = unsafeBitCast(list[2], to: CFSetHashCallBack.self)
equal = unsafeBitCast(list[3], to: CFArrayEqualCallBack.self)
copyDescription = unsafeBitCast(list[4], to: CFBagCopyDescriptionCallBack.self)
_CFRunLoopGet2 = unsafeBitCast(list[5], to: (@convention(c) (CFRunLoop) -> RunLoop).self)
// __CFFireTimer = unsafeBitCast(list[6], to: CFRunLoopTimerCallBack.self)
// _CFRunLoopSourceWakeUpRunLoops = unsafeBitCast(list[6], to: (@convention(c) (CFRunLoopSource) -> Void).self)
}
}
static func create() -> Self {
autoreleasepool {
.init()
}
}
}
}
import Synchronization
final class CurrentRunLoopExecutor: SerialExecutor, TaskExecutor {
// RunLoopExecutor, SchedulingExecutor
private static func makeBinaryHeap() -> CFBinaryHeap {
var heapCb = kCFStringBinaryHeapCallBacks
heapCb.retain = kCFTypeDictionaryKeyCallBacks.retain
heapCb.release = kCFTypeDictionaryKeyCallBacks.release
heapCb.copyDescription = kCFTypeDictionaryKeyCallBacks.copyDescription
heapCb.compare = {
// reverse order
let lhs = Unmanaged<AnyObject>.fromOpaque($0!).takeUnretainedValue() as! UnownedJob
let rhs = Unmanaged<AnyObject>.fromOpaque($1!).takeUnretainedValue() as! UnownedJob
let _ = $2
if rhs.priority < lhs.priority {
return .compareLessThan
}
if rhs.priority > lhs.priority {
return .compareGreaterThan
}
return .compareEqualTo
}
return CFBinaryHeapCreate(nil, 0, &heapCb, nil)!
}
class RunLoopContextStore {
var heap:CFBinaryHeap = CurrentRunLoopExecutor.makeBinaryHeap()
let lock = NSRecursiveLock()
var serial:UnownedSerialExecutor!
var task:UnownedTaskExecutor?
var enterCount = 0
deinit {
precondition(CFBinaryHeapGetCount(heap) == 0, "leak of unprocessed tasks")
}
@objc func repeatedBlock(_ timer:Timer) {
let info = unsafeDowncast(timer.userInfo as AnyObject, to: RepeatInfo.self)
if info.boolean.load(ordering: .acquiring) {
info.block()
} else {
// counter.subtract(1, ordering: .relaxed)
timer.invalidate()
}
}
@objc func perform( timer:Timer) {
let block = timer.userInfo.flatMap {
unsafeBitCast($0 as AnyObject, to: (@convention(block) () -> Void).self)
}
if let block {
block()
// counter.subtract(1, ordering: .relaxed)
} else {
// isActive = false
// CFRunLoopStop(CFRunLoopGetCurrent())
}
}
}
struct RunningInfo {
var task:UnownedTaskExecutor?
var serial:UnownedSerialExecutor
}
@TaskLocal
private static var _current:Unmanaged<CurrentRunLoopExecutor>?
nonisolated(unsafe)
private let runLoop = RunLoop.current
let _isMainExecutor:Bool
nonisolated(unsafe)
private let store:RunLoopContextStore
nonisolated(unsafe)
private let source:CFRunLoopSource = {
let sym = CoreFoundationNamespace.symbols.create()
var source = CFRunLoopSourceContext()
source.retain = sym.retain
source.release = sym.release
source.copyDescription = sym.copyDescription
source.hash = sym.hash
source.equal = sym.equal
source.perform = {
let store = Unmanaged<RunLoopContextStore>.fromOpaque($0!).takeUnretainedValue()
var newHeap = CurrentRunLoopExecutor.makeBinaryHeap()
var info = RunningInfo.init(task: store.task, serial: store.serial)
store.lock.withLock {
swap(&store.heap, &newHeap)
}
let functionCall = unsafeBitCast(CFBinaryHeapGetMinimumIfPresent, to: ((CFBinaryHeap, AutoreleasingUnsafeMutablePointer<AnyObject?>) -> Bool).self)
weak var value:AnyObject?
let taskExecutor = store.task
let serialExecutor = store.serial!
while functionCall(newHeap, &value), let job = value.flatMap({ ($0 as! UnownedJob) }) {
value = nil
CFBinaryHeapRemoveMinimumValue(newHeap)
if let taskExecutor {
job.runSynchronously(isolatedTo: serialExecutor, taskExecutor: taskExecutor)
} else {
job.runSynchronously(on: serialExecutor)
}
}
}
let buffer = RunLoopContextStore()
source.info = Unmanaged.passUnretained(buffer).toOpaque()
return CFRunLoopSourceCreate(nil, 0, &source)
}()
class Invoker:NSObject {
private(set) var invoked = false
var storage:CurrentRunLoopExecutor?
@objc func invoke() {
invoked = true
storage = CurrentRunLoopExecutor._current?.takeUnretainedValue()
}
}
private static func peekLocal() -> CurrentRunLoopExecutor? {
let invoker = Invoker()
invoker.perform(#selector(invoker.invoke), on: Thread.current, with: nil, waitUntilDone: false)
while !invoker.invoked {
let future = CFDateGetAbsoluteTime(Date.distantFuture as CFDate)
CFRunLoopRunInMode(.defaultMode, future, true)
}
if let ref = invoker.storage {
return ref
}
return nil
}
internal static func currentExecutor() -> CurrentRunLoopExecutor {
if Thread.isMainThread {
// we are safe
return CurrentRunLoopExecutor()
} else if let t = _current?.takeUnretainedValue(), t.runLoop == .current {
// current RunLooperRunner === currentRunLoop
// this is also safe
return t
} else if RunLoop.current.currentMode != nil, let local = peekLocal() {
return local
} else {
/*
1. RunLoop is active but does not have `CurrentRunLoopExecutor` in nested loop
2. RunLoop is inactive -> which should always create new one
*/
return CurrentRunLoopExecutor()
}
}
private init() {
var context = CFRunLoopSourceContext()
CFRunLoopSourceGetContext(source, &context)
store = Unmanaged.fromOpaque(context.info).takeUnretainedValue()
_isMainExecutor = runLoop === RunLoop.main
store.serial = asUnownedSerialExecutor()
store.task = asUnownedTaskExecutor()
}
private func transferJob(to current:CurrentRunLoopExecutor) {
var newHeap = Self.makeBinaryHeap()
self.store.lock.withLock {
swap(&self.store.heap, &newHeap)
}
current.store.lock.withLock {
CFBinaryHeapApplyFunction(newHeap, {
CFBinaryHeapAddValue(Unmanaged<CFBinaryHeap>.fromOpaque($1!).takeUnretainedValue(), $0)
}, Unmanaged.passUnretained(current.store.heap).toOpaque())
}
CFRunLoopSourceSignal(current.source)
}
public func run() throws {
guard runLoop === RunLoop.current else {
throw CancellationError()
}
let shouldRemoveSource:Bool
if !CFRunLoopContainsSource(runLoop.getCFRunLoop(), source, .defaultMode) {
if runLoop.currentMode != nil, let t = Self.peekLocal(), t !== self {
// already installed runloop detected
transferJob(to: t)
throw CancellationError()
}
CFRunLoopAddSource(runLoop.getCFRunLoop(), source, .defaultMode)
shouldRemoveSource = true
} else {
shouldRemoveSource = false
}
defer {
if shouldRemoveSource {
CFRunLoopRemoveSource(runLoop.getCFRunLoop(), source, .defaultMode)
}
}
do {
store.enterCount += 1
defer { store.enterCount -= 1}
CurrentRunLoopExecutor.$_current.withValue(.passUnretained(self)) {
let future = CFDateGetAbsoluteTime(Date.distantFuture as CFDate)
while CFRunLoopRunInMode(.defaultMode, future, false) == .timedOut {
}
}
}
}
public func runUntil(_ condition: () -> Bool) throws {
guard runLoop === RunLoop.current else {
throw CancellationError()
}
let shouldRemoveSource:Bool
if !CFRunLoopContainsSource(runLoop.getCFRunLoop(), source, .defaultMode) {
if runLoop.currentMode != nil, let t = Self.peekLocal(), t !== self {
// already installed runloop detected
transferJob(to: t)
throw CancellationError()
}
CFRunLoopAddSource(runLoop.getCFRunLoop(), source, .defaultMode)
shouldRemoveSource = true
} else {
shouldRemoveSource = false
}
defer {
if shouldRemoveSource {
CFRunLoopRemoveSource(runLoop.getCFRunLoop(), source, .defaultMode)
}
}
do {
store.enterCount += 1
defer {
store.enterCount -= 1
}
while !condition() {
CurrentRunLoopExecutor.$_current.withValue(.passUnretained(self)) {
RunLoop.current.run(mode: .default, before: .distantFuture)
}
}
}
}
public func stop() {
CFRunLoopStop(runLoop.getCFRunLoop())
}
func checkIsolated() {
precondition(runLoop === RunLoop.current)
}
func isIsolatingCurrentContext() -> Bool? {
return runLoop === RunLoop.current
}
func asUnownedSerialExecutor() -> UnownedSerialExecutor {
UnownedSerialExecutor(complexEquality: self)
}
func isSameExclusiveExecutionContext(other: CurrentRunLoopExecutor) -> Bool {
runLoop === other.runLoop
}
func enqueue(_ job: consuming ExecutorJob) {
if _isMainExecutor {
MainActor.shared.enqueue(UnownedJob(job))
} else {
let job = UnownedJob(job)
let ref = job as AnyObject
self.store.lock.withLock {
let t = Unmanaged.passUnretained(ref).toOpaque()
CFBinaryHeapAddValue(self.store.heap, t)
}
CFRunLoopSourceSignal(source)
CFRunLoopWakeUp(runLoop.getCFRunLoop())
// let ref = asUnownedSerialExecutor()
// runLoop.perform {
// job.runSynchronously(on: ref)
// }
}
}
public var isMainExecutor: Bool { false }
// var asScheduling: (any SchedulingExecutor)? {
// return self
// }
//
@available(*,unavailable)
public func enqueue<C: Clock>(_ job: consuming ExecutorJob,
at instant: C.Instant,
tolerance: C.Duration? = nil,
clock: C)
{
if let continuous = clock as? ContinuousClock {
let i = (instant as! ContinuousClock.Instant)
continuous.now.duration(to: i)
} else if let suspending = clock as? SuspendingClock {
(instant as! SuspendingClock.Instant)
} else {
clock
}
// _dispatchEnqueue(job, at: instant, tolerance: tolerance, clock: clock,
// executor: self, global: false)
}
}
import Combine
extension CurrentRunLoopExecutor: Combine.Scheduler {
typealias SchedulerOptions = RunLoop.SchedulerOptions
typealias SchedulerTimeType = RunLoop.SchedulerTimeType
func schedule(options: SchedulerOptions?, _ action: @escaping () -> Void) {
if _isMainExecutor {
return runLoop.schedule(options: options, action)
}
self.runLoop.schedule(options: options) {
action()
}
}
func schedule(
after date: SchedulerTimeType,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void
) {
if _isMainExecutor {
return runLoop.schedule(after: date, tolerance: tolerance, options: options, action)
}
let block:@convention(block) () -> Void = action
let timer = Timer(fireAt: date.date, interval: 0, target: self.store, selector: #selector(RunLoopContextStore.perform(timer:)), userInfo: block, repeats: false)
runLoop.add(timer, forMode: .common)
}
func schedule(
after date: SchedulerTimeType,
interval: SchedulerTimeType.Stride,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void
) -> any Cancellable {
if _isMainExecutor {
return runLoop.schedule(after: date, interval: interval, tolerance: tolerance, options: options, action)
}
let info = RepeatInfo(block: action)
let timer = Timer(fireAt: date.date, interval: interval.timeInterval, target: self.store, selector: #selector(RunLoopContextStore.repeatedBlock(_:)), userInfo: info, repeats: true)
info.timer = timer
self.runLoop.add(timer, forMode: .common)
return AnyCancellable(info)
}
var minimumTolerance: SchedulerTimeType.Stride {
runLoop.minimumTolerance
}
var now: SchedulerTimeType {
runLoop.now
}
class RepeatInfo:Combine.Cancellable {
let boolean = Atomic(true)
weak var timer:Timer?
let block:() -> Void
init(block: @escaping () -> Void) {
self.block = block
}
func cancel() {
if boolean.compareExchange(expected: true, desired: false, ordering: .acquiringAndReleasing).exchanged {
timer?.fireDate = .init()
}
}
}
}
//
// RunLoopGCDExecutor3.swift
// Sample
//
// Created by 박병관 on 2/6/26.
//
import Foundation
import Dispatch
#if canImport(Darwin) && canImport(ObjectiveC)
import Darwin
import ObjectiveC
import Synchronization
actor RunLoopGCDExecutor {
nonisolated(unsafe)
unowned let runLoop = RunLoop.current
nonisolated(unsafe)
private let threadId = pthread_self()
nonisolated
private let queue:DispatchSerialQueue
nonisolated let wrappedQueue:DispatchSerialQueue
private let isMainThread:Bool = Thread.isMainThread
nonisolated var unownedExecutor: UnownedSerialExecutor {
asUnownedSerialExecutor()
}
class Delegate: NSObject, NSMachPortDelegate {
var didNested = false
var isRunning = false
let atomicFlag = Atomic<Int>(0)
unowned let parent:RunLoopGCDExecutor
init(_ parent:RunLoopGCDExecutor) {
self.parent = parent
}
func handleMachMessage(_ msg: UnsafeMutableRawPointer) {
let k = msg.assumingMemoryBound(to: mach_msg_header_t.self)
if isRunning {
didNested = true
return
}
isRunning = true
defer {
isRunning = false
}
while Namespace.perform(parent.queue) == true {
}
if atomicFlag.load(ordering: .acquiring) == 1 {
if atomicFlag.compareExchange(expected: 1, desired: 2, ordering: .acquiringAndReleasing).exchanged {
CFRunLoopStop(CFRunLoopGetCurrent())
}
}
}
}
private enum Namespace {
@_silgen_name("_dispatch_runloop_root_queue_create_4CF")
private static func _create_queue(
_ label:UnsafePointer<Int8>?,
_ flags:UInt32
) -> Unmanaged<DispatchSerialQueue>
public static func create_root_queue(
_ label:String?
) -> DispatchSerialQueue {
_create_queue(label, 0).takeRetainedValue()
}
@_silgen_name("_dispatch_runloop_root_queue_get_port_4CF")
public static func get_port(
_ queue:DispatchSerialQueue
) -> mach_port_t
@_silgen_name("_dispatch_runloop_root_queue_perform_4CF")
public static func perform(
_ queue:DispatchSerialQueue
) -> CBool
@_silgen_name("_dispatch_runloop_root_queue_wakeup_4CF")
public static func wakeup(
_ queue:DispatchSerialQueue
)
@_silgen_name("dispatch_block_create_with_qos_class")
private static func __dispatch_block_create_with_qos_class(
_ flags:__dispatch_block_flags_t,
_ qos_class:qos_class_t,
_ relative_priority:Int32,
_ block:@convention(block) () -> Void
) -> Unmanaged<AnyObject>
public static func _dispatch_block_create_with_qos_class(
_ flags:DispatchWorkItemFlags,
_ qos:DispatchQoS,
_ block:@convention(block) () -> Void
) -> (@convention(block) () -> Void) {
let ref = __dispatch_block_create_with_qos_class(.init(rawValue: flags.rawValue), qos.qosClass.rawValue, Int32(qos.relativePriority), block).takeRetainedValue()
return unsafeBitCast(ref, to: (@convention(block) () -> Void).self)
}
public static let _key:DispatchSpecificKey<Unmanaged<RunLoopGCDExecutor>> = .init()
@TaskLocal public static var _current:Unmanaged<RunLoopGCDExecutor>? = nil
}
private init() {
let root = Namespace.create_root_queue(nil)
self.queue = root
wrappedQueue = DispatchSerialQueue(label: "RunLoopGCDExecutor", attributes: [.initiallyInactive], autoreleaseFrequency: .workItem, target: root)
}
deinit {
// DispatchSerialQueue.init(label: "", attributes: [.initiallyInactive])
wrappedQueue.activate()
}
nonisolated
public static func current() -> RunLoopGCDExecutor {
if let t = DispatchQueue.getSpecific(key: Namespace._key)?.takeUnretainedValue() {
return t
}
if let t = Namespace._current?.takeUnretainedValue(), t.runLoop === RunLoop.current {
return t
}
return .init()
}
nonisolated
public func stop() {
guard
let rootQueue = wrappedQueue.getSpecific(key: Namespace._key)?.takeUnretainedValue().queue,
let delegate = NSMachPort.port(withMachPort: Namespace.get_port(rootQueue), options: []).delegate() as? Delegate else {
return
}
if delegate.atomicFlag.compareExchange(expected: 0, desired: 1, ordering: .acquiringAndReleasing).exchanged {
wrappedQueue.async {
}
}
// delegate.parent
}
public func run() throws {
guard withUnsafeCurrentTask(body: { $0 == nil}) else {
throw CancellationError()
}
guard threadId == pthread_self() else {
throw CancellationError()
}
do {
if let ref = Namespace._current {
guard ref.toOpaque() == Unmanaged.passUnretained(self).toOpaque() else {
if let t = Namespace._current?.takeUnretainedValue() {
self.wrappedQueue.setTarget(queue: t.wrappedQueue)
self.wrappedQueue.activate()
}
throw CancellationError()
}
}
}
let pinned = queue
let port = NSMachPort.port(withMachPort: Namespace.get_port(pinned), options: [])
let nested = port.delegate() != nil
let delegate = port.delegate() as! Delegate? ?? {
let t = Delegate.init(self)
port.setDelegate(t)
RunLoop.current.add(port, forMode: .common)
// port.schedule(in: .current, forMode: .common)
return t
}()
wrappedQueue.setSpecific(key: Namespace._key, value: .passUnretained(self))
let observer:CFRunLoopObserver? = if !nested {
CFRunLoopObserverCreateWithHandler(nil, ([.exit, .beforeWaiting] as CFRunLoopActivity).rawValue, true, 0) { [delegate] _, activity in
if delegate.didNested, !delegate.isRunning {
delegate.didNested = false
delegate.parent.wrappedQueue.async {}
}
}
} else {
nil
}
if let observer {
CFRunLoopAddObserver(CFRunLoopGetCurrent(), observer, .commonModes)
}
defer {
observer.flatMap(CFRunLoopObserverInvalidate)
delegate.atomicFlag.store(0, ordering: .releasing)
if !nested {
RunLoop.current.remove(port, forMode: .common)
wrappedQueue.setSpecific(key: Namespace._key, value: nil)
}
}
Namespace.$_current.withValue(Unmanaged.passUnretained(self)) {
wrappedQueue.activate()
while delegate.atomicFlag.load(ordering: .acquiring) != 2 {
CFRunLoopRunInMode(.defaultMode, Date.distantFuture.timeIntervalSinceNow, false)
}
}
}
public func runUntil(_ condition: () -> Bool) throws {
guard withUnsafeCurrentTask(body: { $0 == nil}) else {
throw CancellationError()
}
guard threadId == pthread_self() else {
throw CancellationError()
}
do {
if let ref = Namespace._current {
guard ref.toOpaque() == Unmanaged.passUnretained(self).toOpaque() else {
if let t = Namespace._current?.takeUnretainedValue() {
self.wrappedQueue.setTarget(queue: t.wrappedQueue)
self.wrappedQueue.activate()
}
throw CancellationError()
}
}
}
let pinned = queue
let port = NSMachPort.port(withMachPort: Namespace.get_port(pinned), options: [])
let nested = port.delegate() != nil
let delegate = port.delegate() as! Delegate? ?? {
let t = Delegate(self)
port.setDelegate(t)
RunLoop.current.add(port, forMode: .common)
return t
}()
wrappedQueue.setSpecific(key: Namespace._key, value: .passUnretained(self))
let observer:CFRunLoopObserver? = if !nested {
CFRunLoopObserverCreateWithHandler(nil, ([.exit, .beforeWaiting] as CFRunLoopActivity).rawValue, true, 0) { [delegate] _, activity in
if delegate.didNested, !delegate.isRunning {
delegate.didNested = false
Namespace.wakeup(delegate.parent.queue)
}
}
} else {
nil
}
if let observer {
CFRunLoopAddObserver(CFRunLoopGetCurrent(), observer, .commonModes)
}
defer {
observer.flatMap(CFRunLoopObserverInvalidate)
delegate.atomicFlag.store(0, ordering: .releasing)
if !nested {
RunLoop.current.remove(port, forMode: .common)
wrappedQueue.setSpecific(key: Namespace._key, value: nil)
}
}
try Namespace.$_current.withValue(Unmanaged.passUnretained(self)) {
wrappedQueue.activate()
repeat {
if !runLoop.run(mode: .default, before: .distantFuture) {
throw CancellationError()
}
} while !condition()
}
}
}
extension RunLoopGCDExecutor:SerialExecutor {
nonisolated
func enqueue(_ job: consuming ExecutorJob) {
if isMainThread {
MainActor.shared.enqueue(.init(job))
return
}
wrappedQueue.enqueue(job)
}
nonisolated
func isSameExclusiveExecutionContext(other: RunLoopGCDExecutor) -> Bool {
return runLoop.isEqual(other.runLoop)
}
nonisolated
func isIsolatingCurrentContext() -> Bool? {
threadId == pthread_self()
}
nonisolated
func checkIsolated() {
precondition(threadId == pthread_self())
}
nonisolated
func asUnownedSerialExecutor() -> UnownedSerialExecutor {
if isMainThread {
return MainActor.sharedUnownedExecutor
}
return .init(complexEquality: self)
}
}
@available(*,unavailable)
extension RunLoopGCDExecutor {
nonisolated
public func enqueue<C: Clock>(_ job: consuming ExecutorJob,
after delay: C.Duration,
tolerance: C.Duration? = nil,
clock: C) {
// wrappedQueue.enqueue(job, after: delay, tolerance: tolerance, clock: clock)
switch clock {
case let continuous as ContinuousClock:
let (sec, att) = (delay as! ContinuousClock.Duration).components
let DISPATCH_UP_OR_MONOTONIC_TIME_MASK:UInt64 = (1 << 63)
let raw_time = Dispatch.__dispatch_time(DISPATCH_UP_OR_MONOTONIC_TIME_MASK, sec * Int64(bitPattern: Dispatch.NSEC_PER_SEC) + Int64(att / 1_000_000_000))
if let tol = tolerance as! ContinuousClock.Duration? {
let jobRef = UnownedJob(job)
let source = DispatchSource.makeTimerSource(queue: wrappedQueue)
let (sec, atto) = tol.components
source.setEventHandler(
qos: .init(qosClass: .init(rawValue: .init(UInt32(jobRef.priority.rawValue)))!, relativePriority: 0),
flags: [.enforceQoS]
) { [unowned(unsafe) source, unowned wrappedQueue] in
source.cancel()
wrappedQueue.enqueue(jobRef)
}
Dispatch.__dispatch_source_set_timer(source as! DispatchSource, raw_time, Dispatch.DispatchTime.distantFuture.rawValue, UInt64(sec) * Dispatch.NSEC_PER_SEC + UInt64(atto / 1_000_000_000))
source.activate()
} else {
let jobRef = UnownedJob(job)
let dispatch_block = Namespace._dispatch_block_create_with_qos_class(
[.enforceQoS],
.init(qosClass: .init(rawValue: .init(UInt32(jobRef.priority.rawValue)))!, relativePriority: 0),
{[unowned wrappedQueue] in
wrappedQueue.enqueue(jobRef)
}
)
Dispatch.__dispatch_after(
raw_time,
wrappedQueue,
dispatch_block
)
}
case let suspending as SuspendingClock:
let (sec, att) = (delay as! SuspendingClock.Duration).components
let dispatchTime = DispatchTime.now().advanced(by: .seconds(Int(sec))).advanced(by: .nanoseconds(Int(att / 1_000_000_000)))
if let tol = tolerance as! SuspendingClock.Duration? {
let jobRef = UnownedJob(job)
let source = DispatchSource.makeTimerSource(queue: wrappedQueue)
let (sec, atto) = tol.components
source.setEventHandler(
qos: .init(qosClass: .init(rawValue: .init(UInt32(jobRef.priority.rawValue)))!, relativePriority: 0),
flags: [.enforceQoS]
) { [unowned(unsafe) source, unowned wrappedQueue] in
source.cancel()
wrappedQueue.enqueue(jobRef)
}
let leeway: DispatchTimeInterval = if sec != 0 {
.seconds(Int(sec))
} else {
.nanoseconds(Int(atto / 1_000_000_000))
}
source.schedule(deadline: dispatchTime, repeating: .never, leeway: leeway)
source.activate()
} else {
let jobRef = UnownedJob(job)
wrappedQueue.asyncAfter(deadline: dispatchTime,
qos: .init(qosClass: .init(rawValue: .init(UInt32(jobRef.priority.rawValue)))!, relativePriority: 0),
flags: [.enforceQoS]
) { [unowned wrappedQueue] in
wrappedQueue.enqueue(jobRef)
}
}
default:
fatalError("unavailable")
}
}
nonisolated
public func enqueue<C: Clock>(_ job: consuming ExecutorJob,
at instant: C.Instant,
tolerance: C.Duration? = nil,
clock: C)
{
enqueue(job, after: clock.now.duration(to: instant),
tolerance: tolerance, clock: clock)
}
}
import Combine
extension RunLoopGCDExecutor:Combine.Scheduler {
typealias SchedulerOptions = RunLoop.SchedulerOptions
typealias SchedulerTimeType = RunLoop.SchedulerTimeType
nonisolated func schedule(options: SchedulerOptions?, _ action: @escaping () -> Void) {
runLoop.schedule(options: options, action)
}
nonisolated func schedule(after date: SchedulerTimeType, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions?, _ action: @escaping () -> Void) {
runLoop.schedule(after: date, tolerance: tolerance, options: options, action)
}
nonisolated func schedule(after date: SchedulerTimeType, interval: SchedulerTimeType.Stride, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions?, _ action: @escaping () -> Void) -> any Cancellable {
runLoop.schedule(after: date, interval: interval, tolerance: tolerance, options: options, action)
}
nonisolated var minimumTolerance: SchedulerTimeType.Stride {
runLoop.minimumTolerance
}
nonisolated var now: SchedulerTimeType {
runLoop.now
}
}
#else
@available(*, unavailable)
final class RunLoopGCDExecutor {
}
#endif // canImport(Darwin) && canImport(ObjectiveC)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment