Last active
December 24, 2025 07:42
-
-
Save pbk20191/84e1164b4e21fc994ac622a8172616ff to your computer and use it in GitHub Desktop.
CFRunLoop derived DispatchSerialQueue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import Foundation | |
| import Dispatch | |
| class Receiver: NSObject, NSMachPortDelegate { | |
| weak var ref:DispatchQueue? | |
| weak var runLoop:CFRunLoop? | |
| let perform:@convention(c) (DispatchQueue) -> DarwinBoolean | |
| func handleMachMessage(_ msg: UnsafeMutableRawPointer) { | |
| guard let ref else { return } | |
| while autoreleasepool(invoking: { | |
| perform(ref) | |
| }) == true {} | |
| } | |
| init( perform: @convention(c) (DispatchQueue) -> DarwinBoolean) { | |
| self.perform = perform | |
| } | |
| @objc func run(_ port:NSMachPort) { | |
| autoreleasepool { | |
| self.runLoop = CFRunLoopGetCurrent() | |
| port.setDelegate(self) | |
| RunLoop.current.add(port, forMode: .common) | |
| } | |
| autoreleasepool { | |
| while port.isValid, RunLoop.current.run(mode: .default, before: .distantFuture) { | |
| } | |
| } | |
| } | |
| } | |
| import Combine | |
| enum LibDispatchNameSpace { | |
| var symbols: symbols { .create() } | |
| struct symbols:BitwiseCopyable { | |
| typealias _dispatch_runloop_root_queue_create_4CF_callback = @convention(c) (UnsafePointer<CChar>?, CFOptionFlags) -> Unmanaged<dispatch_queue_serial_t> | |
| typealias _dispatch_runloop_root_queue_perform_4CF_callback = @convention(c) (dispatch_queue_t) -> DarwinBoolean | |
| typealias _dispatch_runloop_root_queue_get_port_4CF_callback = @convention(c) (dispatch_queue_t) -> mach_port_t | |
| typealias dispatch_set_context_callback = @convention(c) (dispatch_object_t, UnsafeMutableRawPointer?) -> Void | |
| typealias dispatch_get_context_callback = @convention(c) (dispatch_object_t) -> UnsafeMutableRawPointer? | |
| typealias dispatch_set_finalizer_f_callback = @convention(c) (dispatch_object_t, CFTreeReleaseCallBack?) -> Void | |
| let _dispatch_runloop_root_queue_create_4CF:_dispatch_runloop_root_queue_create_4CF_callback | |
| let _dispatch_runloop_root_queue_perform_4CF:_dispatch_runloop_root_queue_perform_4CF_callback | |
| let _dispatch_runloop_root_queue_get_port_4CF:_dispatch_runloop_root_queue_get_port_4CF_callback | |
| let dispatch_set_context:dispatch_set_context_callback | |
| let dispatch_get_context:dispatch_get_context_callback | |
| let dispatch_set_finalizer_f:dispatch_set_finalizer_f_callback | |
| private init() { | |
| do { | |
| //_dispatch_continuation_redirect_push | |
| Bundle(for: NSClassFromString("OS_dispatch_queue")!).load() | |
| // NSStringFromClass(<#T##aClass: AnyClass##AnyClass#>) | |
| // let cfBundle = CFBundleCreate(nil, URL(string: "/usr/lib/system/introspection")! as CFURL) | |
| let symbols = [ | |
| "_dispatch_runloop_root_queue_create_4CF", | |
| "_dispatch_runloop_root_queue_perform_4CF", | |
| "_dispatch_runloop_root_queue_get_port_4CF", | |
| "dispatch_set_context", | |
| "dispatch_get_context", | |
| "dispatch_set_finalizer_f", | |
| "_dispatch_lane_invoke", | |
| "_dispatch_lane_push" | |
| ] | |
| // DispatchSource.makeMachSendSource(port: <#T##mach_port_t#>, eventMask: <#T##DispatchSource.MachSendEvent#>) | |
| var list = ContiguousArray<UnsafeMutableRawPointer?>() | |
| for i in symbols { | |
| i.withCString { | |
| list.append(dlsym(.init(bitPattern: -2)!, $0)) | |
| } | |
| } | |
| // list.withUnsafeMutableBufferPointer { | |
| // dlsym(.init(bitPattern: -2)!, <#T##__symbol: UnsafePointer<CChar>!##UnsafePointer<CChar>!#>) | |
| // CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!) | |
| // | |
| // } | |
| _dispatch_runloop_root_queue_create_4CF = unsafeBitCast(list[0], to: _dispatch_runloop_root_queue_create_4CF_callback.self) | |
| _dispatch_runloop_root_queue_perform_4CF = unsafeBitCast(list[1], to: _dispatch_runloop_root_queue_perform_4CF_callback.self) | |
| _dispatch_runloop_root_queue_get_port_4CF = unsafeBitCast(list[2], to: _dispatch_runloop_root_queue_get_port_4CF_callback.self) | |
| dispatch_set_context = unsafeBitCast(list[3], to: dispatch_set_context_callback.self) | |
| dispatch_get_context = unsafeBitCast(list[4], to: dispatch_get_context_callback.self) | |
| dispatch_set_finalizer_f = unsafeBitCast(list[5], to: dispatch_set_finalizer_f_callback.self) | |
| } | |
| } | |
| static func create() -> Self { | |
| autoreleasepool { | |
| .init() | |
| } | |
| } | |
| nonisolated(unsafe) | |
| static let key = DispatchSpecificKey<AnyCancellable>() | |
| func runLoopDispatchQueue() -> DispatchSerialQueue { | |
| let cfSym = CoreFoundationNamespace.symbols.create() | |
| let runLoopQueue = _dispatch_runloop_root_queue_create_4CF(nil, 0).takeRetainedValue() | |
| let mach = _dispatch_runloop_root_queue_get_port_4CF(runLoopQueue) | |
| let cfPort = NSMachPort(machPort: mach, options: []) | |
| let delegate = Receiver(perform: self._dispatch_runloop_root_queue_perform_4CF) | |
| delegate.ref = runLoopQueue | |
| cfPort.setDelegate(delegate) | |
| objc_setAssociatedObject(runLoopQueue, Unmanaged.passUnretained(delegate).toOpaque(), AnyCancellable{ | |
| cfPort.invalidate() | |
| delegate.runLoop.flatMap(CFRunLoopStop) | |
| }, .OBJC_ASSOCIATION_RETAIN_NONATOMIC) | |
| let th = Thread(target: delegate, selector: #selector(delegate.run), object: cfPort) | |
| th.name = "looper" | |
| th.start() | |
| return runLoopQueue | |
| } | |
| } | |
| } | |
| //@_silgen_name("CFRunLoopSourceWakeUpRunLoops") | |
| //func _CFRunLoopSourceWakeUpRunLoops(_ source: CFRunLoopSource) -> Void | |
| enum CoreFoundationNamespace { | |
| struct symbols:BitwiseCopyable { | |
| let retain:CFTreeRetainCallBack | |
| let release:CFTreeReleaseCallBack | |
| let hash:CFSetHashCallBack | |
| let equal:CFArrayEqualCallBack | |
| let copyDescription:CFBagCopyDescriptionCallBack | |
| let _CFRunLoopGet2:@convention(c) (CFRunLoop) -> RunLoop | |
| // let _CFFireTimer:CFRunLoopTimerCallBack | |
| // let _CFRunLoopSourceWakeUpRunLoops:@convention(c) (CFRunLoopSource) -> Void | |
| private init() { | |
| do { | |
| let cfBundle = CFBundleGetBundleWithIdentifier("com.apple.CoreFoundation" as CFString)! | |
| let symbols = [ | |
| "CFRetain", | |
| "CFRelease", | |
| "CFHash", | |
| "CFEqual", | |
| "CFCopyDescription", | |
| "_CFRunLoopGet2", | |
| // "_CFRunLoopSourceWakeUpRunLoops", | |
| // "_CFFireTimer" | |
| ] | |
| var list = ContiguousArray<UnsafeMutableRawPointer?>.init(repeating: nil, count: symbols.count) | |
| list.withUnsafeMutableBufferPointer { | |
| CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!) | |
| } | |
| retain = unsafeBitCast(list[0], to: CFTreeRetainCallBack.self) | |
| release = unsafeBitCast(list[1], to: CFTreeReleaseCallBack.self) | |
| hash = unsafeBitCast(list[2], to: CFSetHashCallBack.self) | |
| equal = unsafeBitCast(list[3], to: CFArrayEqualCallBack.self) | |
| copyDescription = unsafeBitCast(list[4], to: CFBagCopyDescriptionCallBack.self) | |
| _CFRunLoopGet2 = unsafeBitCast(list[5], to: (@convention(c) (CFRunLoop) -> RunLoop).self) | |
| // __CFFireTimer = unsafeBitCast(list[6], to: CFRunLoopTimerCallBack.self) | |
| // _CFRunLoopSourceWakeUpRunLoops = unsafeBitCast(list[6], to: (@convention(c) (CFRunLoopSource) -> Void).self) | |
| } | |
| } | |
| static func create() -> Self { | |
| autoreleasepool { | |
| .init() | |
| } | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import Synchronization | |
| final class CurrentRunLoopExecutor: SerialExecutor, TaskExecutor { | |
| // RunLoopExecutor, SchedulingExecutor | |
| private static func makeBinaryHeap() -> CFBinaryHeap { | |
| var heapCb = kCFStringBinaryHeapCallBacks | |
| heapCb.retain = kCFTypeDictionaryKeyCallBacks.retain | |
| heapCb.release = kCFTypeDictionaryKeyCallBacks.release | |
| heapCb.copyDescription = kCFTypeDictionaryKeyCallBacks.copyDescription | |
| heapCb.compare = { | |
| // reverse order | |
| let lhs = Unmanaged<AnyObject>.fromOpaque($0!).takeUnretainedValue() as! UnownedJob | |
| let rhs = Unmanaged<AnyObject>.fromOpaque($1!).takeUnretainedValue() as! UnownedJob | |
| let _ = $2 | |
| if rhs.priority < lhs.priority { | |
| return .compareLessThan | |
| } | |
| if rhs.priority > lhs.priority { | |
| return .compareGreaterThan | |
| } | |
| return .compareEqualTo | |
| } | |
| return CFBinaryHeapCreate(nil, 0, &heapCb, nil)! | |
| } | |
| class RunLoopContextStore { | |
| var heap:CFBinaryHeap = CurrentRunLoopExecutor.makeBinaryHeap() | |
| let lock = NSRecursiveLock() | |
| var serial:UnownedSerialExecutor! | |
| var task:UnownedTaskExecutor? | |
| var enterCount = 0 | |
| deinit { | |
| precondition(CFBinaryHeapGetCount(heap) == 0, "leak of unprocessed tasks") | |
| } | |
| @objc func repeatedBlock(_ timer:Timer) { | |
| let info = unsafeDowncast(timer.userInfo as AnyObject, to: RepeatInfo.self) | |
| if info.boolean.load(ordering: .acquiring) { | |
| info.block() | |
| } else { | |
| // counter.subtract(1, ordering: .relaxed) | |
| timer.invalidate() | |
| } | |
| } | |
| @objc func perform( timer:Timer) { | |
| let block = timer.userInfo.flatMap { | |
| unsafeBitCast($0 as AnyObject, to: (@convention(block) () -> Void).self) | |
| } | |
| if let block { | |
| block() | |
| // counter.subtract(1, ordering: .relaxed) | |
| } else { | |
| // isActive = false | |
| // CFRunLoopStop(CFRunLoopGetCurrent()) | |
| } | |
| } | |
| } | |
| struct RunningInfo { | |
| var task:UnownedTaskExecutor? | |
| var serial:UnownedSerialExecutor | |
| } | |
| @TaskLocal | |
| private static var _current:Unmanaged<CurrentRunLoopExecutor>? | |
| nonisolated(unsafe) | |
| private let runLoop = RunLoop.current | |
| let _isMainExecutor:Bool | |
| nonisolated(unsafe) | |
| private let store:RunLoopContextStore | |
| nonisolated(unsafe) | |
| private let source:CFRunLoopSource = { | |
| let sym = CoreFoundationNamespace.symbols.create() | |
| var source = CFRunLoopSourceContext() | |
| source.retain = sym.retain | |
| source.release = sym.release | |
| source.copyDescription = sym.copyDescription | |
| source.hash = sym.hash | |
| source.equal = sym.equal | |
| source.perform = { | |
| let store = Unmanaged<RunLoopContextStore>.fromOpaque($0!).takeUnretainedValue() | |
| var newHeap = CurrentRunLoopExecutor.makeBinaryHeap() | |
| var info = RunningInfo.init(task: store.task, serial: store.serial) | |
| store.lock.withLock { | |
| swap(&store.heap, &newHeap) | |
| } | |
| let functionCall = unsafeBitCast(CFBinaryHeapGetMinimumIfPresent, to: ((CFBinaryHeap, AutoreleasingUnsafeMutablePointer<AnyObject?>) -> Bool).self) | |
| weak var value:AnyObject? | |
| let taskExecutor = store.task | |
| let serialExecutor = store.serial! | |
| while functionCall(newHeap, &value), let job = value.flatMap({ ($0 as! UnownedJob) }) { | |
| value = nil | |
| CFBinaryHeapRemoveMinimumValue(newHeap) | |
| if let taskExecutor { | |
| job.runSynchronously(isolatedTo: serialExecutor, taskExecutor: taskExecutor) | |
| } else { | |
| job.runSynchronously(on: serialExecutor) | |
| } | |
| } | |
| } | |
| let buffer = RunLoopContextStore() | |
| source.info = Unmanaged.passUnretained(buffer).toOpaque() | |
| return CFRunLoopSourceCreate(nil, 0, &source) | |
| }() | |
| class Invoker:NSObject { | |
| private(set) var invoked = false | |
| var storage:CurrentRunLoopExecutor? | |
| @objc func invoke() { | |
| invoked = true | |
| storage = CurrentRunLoopExecutor._current?.takeUnretainedValue() | |
| } | |
| } | |
| private static func peekLocal() -> CurrentRunLoopExecutor? { | |
| let invoker = Invoker() | |
| invoker.perform(#selector(invoker.invoke), on: Thread.current, with: nil, waitUntilDone: false) | |
| while !invoker.invoked { | |
| let future = CFDateGetAbsoluteTime(Date.distantFuture as CFDate) | |
| CFRunLoopRunInMode(.defaultMode, future, true) | |
| } | |
| if let ref = invoker.storage { | |
| return ref | |
| } | |
| return nil | |
| } | |
| internal static func currentExecutor() -> CurrentRunLoopExecutor { | |
| if Thread.isMainThread { | |
| // we are safe | |
| return CurrentRunLoopExecutor() | |
| } else if let t = _current?.takeUnretainedValue(), t.runLoop == .current { | |
| // current RunLooperRunner === currentRunLoop | |
| // this is also safe | |
| return t | |
| } else if RunLoop.current.currentMode != nil, let local = peekLocal() { | |
| return local | |
| } else { | |
| /* | |
| 1. RunLoop is active but does not have `CurrentRunLoopExecutor` in nested loop | |
| 2. RunLoop is inactive -> which should always create new one | |
| */ | |
| return CurrentRunLoopExecutor() | |
| } | |
| } | |
| private init() { | |
| var context = CFRunLoopSourceContext() | |
| CFRunLoopSourceGetContext(source, &context) | |
| store = Unmanaged.fromOpaque(context.info).takeUnretainedValue() | |
| _isMainExecutor = runLoop === RunLoop.main | |
| store.serial = asUnownedSerialExecutor() | |
| store.task = asUnownedTaskExecutor() | |
| } | |
| private func transferJob(to current:CurrentRunLoopExecutor) { | |
| var newHeap = Self.makeBinaryHeap() | |
| self.store.lock.withLock { | |
| swap(&self.store.heap, &newHeap) | |
| } | |
| current.store.lock.withLock { | |
| CFBinaryHeapApplyFunction(newHeap, { | |
| CFBinaryHeapAddValue(Unmanaged<CFBinaryHeap>.fromOpaque($1!).takeUnretainedValue(), $0) | |
| }, Unmanaged.passUnretained(current.store.heap).toOpaque()) | |
| } | |
| CFRunLoopSourceSignal(current.source) | |
| } | |
| public func run() throws { | |
| guard runLoop === RunLoop.current else { | |
| throw CancellationError() | |
| } | |
| let shouldRemoveSource:Bool | |
| if !CFRunLoopContainsSource(runLoop.getCFRunLoop(), source, .defaultMode) { | |
| if runLoop.currentMode != nil, let t = Self.peekLocal(), t !== self { | |
| // already installed runloop detected | |
| transferJob(to: t) | |
| throw CancellationError() | |
| } | |
| CFRunLoopAddSource(runLoop.getCFRunLoop(), source, .defaultMode) | |
| shouldRemoveSource = true | |
| } else { | |
| shouldRemoveSource = false | |
| } | |
| defer { | |
| if shouldRemoveSource { | |
| CFRunLoopRemoveSource(runLoop.getCFRunLoop(), source, .defaultMode) | |
| } | |
| } | |
| do { | |
| store.enterCount += 1 | |
| defer { store.enterCount -= 1} | |
| CurrentRunLoopExecutor.$_current.withValue(.passUnretained(self)) { | |
| let future = CFDateGetAbsoluteTime(Date.distantFuture as CFDate) | |
| while CFRunLoopRunInMode(.defaultMode, future, false) == .timedOut { | |
| } | |
| } | |
| } | |
| } | |
| public func runUntil(_ condition: () -> Bool) throws { | |
| guard runLoop === RunLoop.current else { | |
| throw CancellationError() | |
| } | |
| let shouldRemoveSource:Bool | |
| if !CFRunLoopContainsSource(runLoop.getCFRunLoop(), source, .defaultMode) { | |
| if runLoop.currentMode != nil, let t = Self.peekLocal(), t !== self { | |
| // already installed runloop detected | |
| transferJob(to: t) | |
| throw CancellationError() | |
| } | |
| CFRunLoopAddSource(runLoop.getCFRunLoop(), source, .defaultMode) | |
| shouldRemoveSource = true | |
| } else { | |
| shouldRemoveSource = false | |
| } | |
| defer { | |
| if shouldRemoveSource { | |
| CFRunLoopRemoveSource(runLoop.getCFRunLoop(), source, .defaultMode) | |
| } | |
| } | |
| do { | |
| store.enterCount += 1 | |
| defer { | |
| store.enterCount -= 1 | |
| } | |
| while !condition() { | |
| CurrentRunLoopExecutor.$_current.withValue(.passUnretained(self)) { | |
| RunLoop.current.run(mode: .default, before: .distantFuture) | |
| } | |
| } | |
| } | |
| } | |
| public func stop() { | |
| CFRunLoopStop(runLoop.getCFRunLoop()) | |
| } | |
| func checkIsolated() { | |
| precondition(runLoop === RunLoop.current) | |
| } | |
| func isIsolatingCurrentContext() -> Bool? { | |
| return runLoop === RunLoop.current | |
| } | |
| func asUnownedSerialExecutor() -> UnownedSerialExecutor { | |
| UnownedSerialExecutor(complexEquality: self) | |
| } | |
| func isSameExclusiveExecutionContext(other: CurrentRunLoopExecutor) -> Bool { | |
| runLoop === other.runLoop | |
| } | |
| func enqueue(_ job: consuming ExecutorJob) { | |
| if _isMainExecutor { | |
| MainActor.shared.enqueue(UnownedJob(job)) | |
| } else { | |
| let job = UnownedJob(job) | |
| let ref = job as AnyObject | |
| self.store.lock.withLock { | |
| let t = Unmanaged.passUnretained(ref).toOpaque() | |
| CFBinaryHeapAddValue(self.store.heap, t) | |
| } | |
| CFRunLoopSourceSignal(source) | |
| CFRunLoopWakeUp(runLoop.getCFRunLoop()) | |
| // let ref = asUnownedSerialExecutor() | |
| // runLoop.perform { | |
| // job.runSynchronously(on: ref) | |
| // } | |
| } | |
| } | |
| public var isMainExecutor: Bool { false } | |
| // var asScheduling: (any SchedulingExecutor)? { | |
| // return self | |
| // } | |
| // | |
| @available(*,unavailable) | |
| public func enqueue<C: Clock>(_ job: consuming ExecutorJob, | |
| at instant: C.Instant, | |
| tolerance: C.Duration? = nil, | |
| clock: C) | |
| { | |
| if let continuous = clock as? ContinuousClock { | |
| let i = (instant as! ContinuousClock.Instant) | |
| continuous.now.duration(to: i) | |
| } else if let suspending = clock as? SuspendingClock { | |
| (instant as! SuspendingClock.Instant) | |
| } else { | |
| clock | |
| } | |
| // _dispatchEnqueue(job, at: instant, tolerance: tolerance, clock: clock, | |
| // executor: self, global: false) | |
| } | |
| } | |
| import Combine | |
| extension CurrentRunLoopExecutor: Combine.Scheduler { | |
| typealias SchedulerOptions = RunLoop.SchedulerOptions | |
| typealias SchedulerTimeType = RunLoop.SchedulerTimeType | |
| func schedule(options: SchedulerOptions?, _ action: @escaping () -> Void) { | |
| if _isMainExecutor { | |
| return runLoop.schedule(options: options, action) | |
| } | |
| self.runLoop.schedule(options: options) { | |
| action() | |
| } | |
| } | |
| func schedule( | |
| after date: SchedulerTimeType, | |
| tolerance: SchedulerTimeType.Stride, | |
| options: SchedulerOptions?, | |
| _ action: @escaping () -> Void | |
| ) { | |
| if _isMainExecutor { | |
| return runLoop.schedule(after: date, tolerance: tolerance, options: options, action) | |
| } | |
| let block:@convention(block) () -> Void = action | |
| let timer = Timer(fireAt: date.date, interval: 0, target: self.store, selector: #selector(RunLoopContextStore.perform(timer:)), userInfo: block, repeats: false) | |
| runLoop.add(timer, forMode: .common) | |
| } | |
| func schedule( | |
| after date: SchedulerTimeType, | |
| interval: SchedulerTimeType.Stride, | |
| tolerance: SchedulerTimeType.Stride, | |
| options: SchedulerOptions?, | |
| _ action: @escaping () -> Void | |
| ) -> any Cancellable { | |
| if _isMainExecutor { | |
| return runLoop.schedule(after: date, interval: interval, tolerance: tolerance, options: options, action) | |
| } | |
| let info = RepeatInfo(block: action) | |
| let timer = Timer(fireAt: date.date, interval: interval.timeInterval, target: self.store, selector: #selector(RunLoopContextStore.repeatedBlock(_:)), userInfo: info, repeats: true) | |
| info.timer = timer | |
| self.runLoop.add(timer, forMode: .common) | |
| return AnyCancellable(info) | |
| } | |
| var minimumTolerance: SchedulerTimeType.Stride { | |
| runLoop.minimumTolerance | |
| } | |
| var now: SchedulerTimeType { | |
| runLoop.now | |
| } | |
| class RepeatInfo:Combine.Cancellable { | |
| let boolean = Atomic(true) | |
| weak var timer:Timer? | |
| let block:() -> Void | |
| init(block: @escaping () -> Void) { | |
| self.block = block | |
| } | |
| func cancel() { | |
| if boolean.compareExchange(expected: true, desired: false, ordering: .acquiringAndReleasing).exchanged { | |
| timer?.fireDate = .init() | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment