Last active
February 9, 2026 00:20
-
-
Save pbk20191/84e1164b4e21fc994ac622a8172616ff to your computer and use it in GitHub Desktop.
CFRunLoop derived DispatchSerialQueue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // | |
| // RunLoopGCDExecutor.swift | |
| // SampleSample | |
| // | |
| // Created by 박병관 on 1/13/26. | |
| // | |
| import Foundation | |
| import Dispatch | |
| import _Concurrency | |
| final class _RunLoopGCDExecutor { | |
| nonisolated(unsafe) | |
| private let runLoop = RunLoop.current | |
| private let queue:DispatchSerialQueue = RunLoop.current.getCFRunLoop().peekDefaultRunLoopQueue() | |
| private let wrappedQueue:DispatchSerialQueue = DispatchSerialQueue(label: "D", target: RunLoop.current.getCFRunLoop().peekDefaultRunLoopQueue()) | |
| public func stop() { | |
| CFRunLoopStop(runLoop.getCFRunLoop()) | |
| } | |
| public func run() throws { | |
| let noTask = withUnsafeCurrentTask(body: { $0 == nil}) | |
| guard noTask else { | |
| throw CancellationError() | |
| } | |
| guard runLoop.isEqual(to: RunLoop.current) else { | |
| throw CancellationError() | |
| } | |
| var value = CFRunLoopRunResult.handledSource | |
| let port = NSMachPort.port(withMachPort: LibDispatchNameSpace.symbols.create()._dispatch_runloop_root_queue_get_port_4CF(queue), options: []) | |
| runLoop.add(port, forMode: .default) | |
| defer { | |
| runLoop.remove(port, forMode: .default) | |
| } | |
| while value != .finished && value != .stopped { | |
| value = CFRunLoopRunInMode(.defaultMode, Date.distantFuture.timeIntervalSinceNow, false) | |
| } | |
| } | |
| public func runUntil(_ condition: () -> Bool) throws { | |
| let noTask = withUnsafeCurrentTask(body: { $0 == nil}) | |
| guard noTask else { | |
| throw CancellationError() | |
| } | |
| guard runLoop.isEqual(to: RunLoop.current) else { | |
| throw CancellationError() | |
| } | |
| let port = NSMachPort.port(withMachPort: LibDispatchNameSpace.symbols.create()._dispatch_runloop_root_queue_get_port_4CF(queue), options: []) | |
| runLoop.add(port, forMode: .default) | |
| defer { | |
| runLoop.remove(port, forMode: .default) | |
| } | |
| while !condition() { | |
| if !runLoop.run(mode: .default, before: .distantFuture) { | |
| throw CancellationError() | |
| } | |
| } | |
| } | |
| } | |
| extension _RunLoopGCDExecutor:SerialExecutor, TaskExecutor { | |
| func enqueue(_ job: consuming ExecutorJob) { | |
| wrappedQueue.enqueue(job) | |
| } | |
| func isSameExclusiveExecutionContext(other: RunLoopGCDExecutor) -> Bool { | |
| return runLoop.isEqual(other.runLoop) | |
| } | |
| func isIsolatingCurrentContext() -> Bool? { | |
| runLoop.isEqual(RunLoop.current) | |
| } | |
| func checkIsolated() { | |
| precondition(runLoop.isEqual(RunLoop.current)) | |
| } | |
| } | |
| fileprivate extension CFRunLoop { | |
| private func peekRunLoopModeSet() -> CFSet { | |
| Unmanaged<CFSet>.fromOpaque(unsafeBitCast(self, to: UnsafeMutableRawPointer.self).advanced(by: 2664).load(as: UnsafeRawPointer?.self)!).takeUnretainedValue() | |
| } | |
| private func peekDefaultRunLoopMode() -> AnyObject { | |
| let set = CFSetCreateCopy(nil, self.peekRunLoopModeSet()) | |
| var value:AnyObject? = nil | |
| CFSetApplyFunction(set, { c_value, c_context in | |
| let ptr = c_context?.assumingMemoryBound(to: AnyObject.self) | |
| if ptr?.pointee != nil { | |
| return | |
| } | |
| let modeName = c_value!.advanced(by: 80).load(as: UnsafeRawPointer?.self).flatMap({ Unmanaged<CFString>.fromOpaque($0).takeUnretainedValue() }) | |
| if CFEqual(modeName, CFRunLoopMode.defaultMode.rawValue) { | |
| ptr?.pointee = Unmanaged<AnyObject>.fromOpaque(c_value!).takeUnretainedValue() | |
| } | |
| }, &value) | |
| return value! | |
| } | |
| func peekDefaultRunLoopQueue() -> DispatchSerialQueue { | |
| let ref = peekDefaultRunLoopMode() | |
| return Unmanaged.passUnretained(ref).toOpaque().advanced(by: 160).load(as: UnsafeRawPointer?.self).flatMap({ | |
| Unmanaged<DispatchSerialQueue>.fromOpaque($0).takeUnretainedValue() | |
| })! | |
| } | |
| // func peekDefault | |
| } | |
| extension _RunLoopGCDExecutor { | |
| enum LibDispatchNameSpace { | |
| var symbols: symbols { .create() } | |
| struct symbols:BitwiseCopyable { | |
| typealias _dispatch_runloop_root_queue_create_4CF_callback = @convention(c) (UnsafePointer<CChar>?, CFOptionFlags) -> Unmanaged<dispatch_queue_serial_t> | |
| typealias _dispatch_runloop_root_queue_perform_4CF_callback = @convention(c) (dispatch_queue_t) -> DarwinBoolean | |
| typealias _dispatch_runloop_root_queue_get_port_4CF_callback = @convention(c) (dispatch_queue_t) -> mach_port_t | |
| typealias dispatch_set_context_callback = @convention(c) (dispatch_object_t, UnsafeMutableRawPointer?) -> Void | |
| typealias dispatch_get_context_callback = @convention(c) (dispatch_object_t) -> UnsafeMutableRawPointer? | |
| typealias dispatch_set_finalizer_f_callback = @convention(c) (dispatch_object_t, CFTreeReleaseCallBack?) -> Void | |
| let _dispatch_runloop_root_queue_create_4CF:_dispatch_runloop_root_queue_create_4CF_callback | |
| let _dispatch_runloop_root_queue_perform_4CF:_dispatch_runloop_root_queue_perform_4CF_callback | |
| let _dispatch_runloop_root_queue_get_port_4CF:_dispatch_runloop_root_queue_get_port_4CF_callback | |
| let dispatch_set_context:dispatch_set_context_callback | |
| let dispatch_get_context:dispatch_get_context_callback | |
| let dispatch_set_finalizer_f:dispatch_set_finalizer_f_callback | |
| private init() { | |
| do { | |
| //_dispatch_continuation_redirect_push | |
| Bundle(for: NSClassFromString("OS_dispatch_queue")!).load() | |
| // NSStringFromClass(<#T##aClass: AnyClass##AnyClass#>) | |
| // let cfBundle = CFBundleCreate(nil, URL(string: "/usr/lib/system/introspection")! as CFURL) | |
| let symbols = [ | |
| "_dispatch_runloop_root_queue_create_4CF", | |
| "_dispatch_runloop_root_queue_perform_4CF", | |
| "_dispatch_runloop_root_queue_get_port_4CF", | |
| "dispatch_set_context", | |
| "dispatch_get_context", | |
| "dispatch_set_finalizer_f", | |
| "_dispatch_lane_invoke", | |
| "_dispatch_lane_push" | |
| ] | |
| // DispatchSource.makeMachSendSource(port: <#T##mach_port_t#>, eventMask: <#T##DispatchSource.MachSendEvent#>) | |
| var list = ContiguousArray<UnsafeMutableRawPointer?>() | |
| for i in symbols { | |
| i.withCString { | |
| list.append(dlsym(.init(bitPattern: -2)!, $0)) | |
| } | |
| } | |
| // list.withUnsafeMutableBufferPointer { | |
| // dlsym(.init(bitPattern: -2)!, <#T##__symbol: UnsafePointer<CChar>!##UnsafePointer<CChar>!#>) | |
| // CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!) | |
| // | |
| // } | |
| _dispatch_runloop_root_queue_create_4CF = unsafeBitCast(list[0], to: _dispatch_runloop_root_queue_create_4CF_callback.self) | |
| _dispatch_runloop_root_queue_perform_4CF = unsafeBitCast(list[1], to: _dispatch_runloop_root_queue_perform_4CF_callback.self) | |
| _dispatch_runloop_root_queue_get_port_4CF = unsafeBitCast(list[2], to: _dispatch_runloop_root_queue_get_port_4CF_callback.self) | |
| dispatch_set_context = unsafeBitCast(list[3], to: dispatch_set_context_callback.self) | |
| dispatch_get_context = unsafeBitCast(list[4], to: dispatch_get_context_callback.self) | |
| dispatch_set_finalizer_f = unsafeBitCast(list[5], to: dispatch_set_finalizer_f_callback.self) | |
| } | |
| } | |
| static func create() -> Self { | |
| autoreleasepool { | |
| .init() | |
| } | |
| } | |
| } | |
| } | |
| } | |
| extension _RunLoopGCDExecutor { | |
| enum CoreFoundationNamespace { | |
| struct symbols:BitwiseCopyable { | |
| let retain:CFTreeRetainCallBack | |
| let release:CFTreeReleaseCallBack | |
| let hash:CFSetHashCallBack | |
| let equal:CFArrayEqualCallBack | |
| let copyDescription:CFBagCopyDescriptionCallBack | |
| let _CFRunLoopGet2:@convention(c) (CFRunLoop) -> RunLoop | |
| // let _CFFireTimer:CFRunLoopTimerCallBack | |
| // let _CFRunLoopSourceWakeUpRunLoops:@convention(c) (CFRunLoopSource) -> Void | |
| private init() { | |
| do { | |
| let cfBundle = CFBundleGetBundleWithIdentifier("com.apple.CoreFoundation" as CFString)! | |
| let symbols = [ | |
| "CFRetain", | |
| "CFRelease", | |
| "CFHash", | |
| "CFEqual", | |
| "CFCopyDescription", | |
| "_CFRunLoopGet2", | |
| // "_CFRunLoopSourceWakeUpRunLoops", | |
| // "_CFFireTimer" | |
| ] | |
| var list = ContiguousArray<UnsafeMutableRawPointer?>.init(repeating: nil, count: symbols.count) | |
| list.withUnsafeMutableBufferPointer { | |
| CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!) | |
| } | |
| retain = unsafeBitCast(list[0], to: CFTreeRetainCallBack.self) | |
| release = unsafeBitCast(list[1], to: CFTreeReleaseCallBack.self) | |
| hash = unsafeBitCast(list[2], to: CFSetHashCallBack.self) | |
| equal = unsafeBitCast(list[3], to: CFArrayEqualCallBack.self) | |
| copyDescription = unsafeBitCast(list[4], to: CFBagCopyDescriptionCallBack.self) | |
| _CFRunLoopGet2 = unsafeBitCast(list[5], to: (@convention(c) (CFRunLoop) -> RunLoop).self) | |
| // __CFFireTimer = unsafeBitCast(list[6], to: CFRunLoopTimerCallBack.self) | |
| // _CFRunLoopSourceWakeUpRunLoops = unsafeBitCast(list[6], to: (@convention(c) (CFRunLoopSource) -> Void).self) | |
| } | |
| } | |
| static func create() -> Self { | |
| autoreleasepool { | |
| .init() | |
| } | |
| } | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import Foundation | |
| import Dispatch | |
| class Receiver: NSObject, NSMachPortDelegate { | |
| weak var ref:DispatchQueue? | |
| weak var runLoop:CFRunLoop? | |
| let perform:@convention(c) (DispatchQueue) -> DarwinBoolean | |
| func handleMachMessage(_ msg: UnsafeMutableRawPointer) { | |
| guard let ref else { return } | |
| while autoreleasepool(invoking: { | |
| perform(ref) | |
| }) == true {} | |
| } | |
| init( perform: @convention(c) (DispatchQueue) -> DarwinBoolean) { | |
| self.perform = perform | |
| } | |
| @objc func run(_ port:NSMachPort) { | |
| autoreleasepool { | |
| self.runLoop = CFRunLoopGetCurrent() | |
| port.setDelegate(self) | |
| RunLoop.current.add(port, forMode: .common) | |
| } | |
| autoreleasepool { | |
| while port.isValid, RunLoop.current.run(mode: .default, before: .distantFuture) { | |
| } | |
| } | |
| } | |
| } | |
| import Combine | |
| enum LibDispatchNameSpace { | |
| var symbols: symbols { .create() } | |
| struct symbols:BitwiseCopyable { | |
| typealias _dispatch_runloop_root_queue_create_4CF_callback = @convention(c) (UnsafePointer<CChar>?, CFOptionFlags) -> Unmanaged<dispatch_queue_serial_t> | |
| typealias _dispatch_runloop_root_queue_perform_4CF_callback = @convention(c) (dispatch_queue_t) -> DarwinBoolean | |
| typealias _dispatch_runloop_root_queue_get_port_4CF_callback = @convention(c) (dispatch_queue_t) -> mach_port_t | |
| typealias dispatch_set_context_callback = @convention(c) (dispatch_object_t, UnsafeMutableRawPointer?) -> Void | |
| typealias dispatch_get_context_callback = @convention(c) (dispatch_object_t) -> UnsafeMutableRawPointer? | |
| typealias dispatch_set_finalizer_f_callback = @convention(c) (dispatch_object_t, CFTreeReleaseCallBack?) -> Void | |
| let _dispatch_runloop_root_queue_create_4CF:_dispatch_runloop_root_queue_create_4CF_callback | |
| let _dispatch_runloop_root_queue_perform_4CF:_dispatch_runloop_root_queue_perform_4CF_callback | |
| let _dispatch_runloop_root_queue_get_port_4CF:_dispatch_runloop_root_queue_get_port_4CF_callback | |
| let dispatch_set_context:dispatch_set_context_callback | |
| let dispatch_get_context:dispatch_get_context_callback | |
| let dispatch_set_finalizer_f:dispatch_set_finalizer_f_callback | |
| private init() { | |
| do { | |
| //_dispatch_continuation_redirect_push | |
| Bundle(for: NSClassFromString("OS_dispatch_queue")!).load() | |
| // NSStringFromClass(<#T##aClass: AnyClass##AnyClass#>) | |
| // let cfBundle = CFBundleCreate(nil, URL(string: "/usr/lib/system/introspection")! as CFURL) | |
| let symbols = [ | |
| "_dispatch_runloop_root_queue_create_4CF", | |
| "_dispatch_runloop_root_queue_perform_4CF", | |
| "_dispatch_runloop_root_queue_get_port_4CF", | |
| "dispatch_set_context", | |
| "dispatch_get_context", | |
| "dispatch_set_finalizer_f", | |
| "_dispatch_lane_invoke", | |
| "_dispatch_lane_push" | |
| ] | |
| // DispatchSource.makeMachSendSource(port: <#T##mach_port_t#>, eventMask: <#T##DispatchSource.MachSendEvent#>) | |
| var list = ContiguousArray<UnsafeMutableRawPointer?>() | |
| for i in symbols { | |
| i.withCString { | |
| list.append(dlsym(.init(bitPattern: -2)!, $0)) | |
| } | |
| } | |
| // list.withUnsafeMutableBufferPointer { | |
| // dlsym(.init(bitPattern: -2)!, <#T##__symbol: UnsafePointer<CChar>!##UnsafePointer<CChar>!#>) | |
| // CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!) | |
| // | |
| // } | |
| _dispatch_runloop_root_queue_create_4CF = unsafeBitCast(list[0], to: _dispatch_runloop_root_queue_create_4CF_callback.self) | |
| _dispatch_runloop_root_queue_perform_4CF = unsafeBitCast(list[1], to: _dispatch_runloop_root_queue_perform_4CF_callback.self) | |
| _dispatch_runloop_root_queue_get_port_4CF = unsafeBitCast(list[2], to: _dispatch_runloop_root_queue_get_port_4CF_callback.self) | |
| dispatch_set_context = unsafeBitCast(list[3], to: dispatch_set_context_callback.self) | |
| dispatch_get_context = unsafeBitCast(list[4], to: dispatch_get_context_callback.self) | |
| dispatch_set_finalizer_f = unsafeBitCast(list[5], to: dispatch_set_finalizer_f_callback.self) | |
| } | |
| } | |
| static func create() -> Self { | |
| autoreleasepool { | |
| .init() | |
| } | |
| } | |
| nonisolated(unsafe) | |
| static let key = DispatchSpecificKey<AnyCancellable>() | |
| func runLoopDispatchQueue() -> DispatchSerialQueue { | |
| let cfSym = CoreFoundationNamespace.symbols.create() | |
| let runLoopQueue = _dispatch_runloop_root_queue_create_4CF(nil, 0).takeRetainedValue() | |
| let mach = _dispatch_runloop_root_queue_get_port_4CF(runLoopQueue) | |
| let cfPort = NSMachPort(machPort: mach, options: []) | |
| let delegate = Receiver(perform: self._dispatch_runloop_root_queue_perform_4CF) | |
| delegate.ref = runLoopQueue | |
| cfPort.setDelegate(delegate) | |
| objc_setAssociatedObject(runLoopQueue, Unmanaged.passUnretained(delegate).toOpaque(), AnyCancellable{ | |
| cfPort.invalidate() | |
| delegate.runLoop.flatMap(CFRunLoopStop) | |
| }, .OBJC_ASSOCIATION_RETAIN_NONATOMIC) | |
| let th = Thread(target: delegate, selector: #selector(delegate.run), object: cfPort) | |
| th.name = "looper" | |
| th.start() | |
| return runLoopQueue | |
| } | |
| } | |
| } | |
| //@_silgen_name("CFRunLoopSourceWakeUpRunLoops") | |
| //func _CFRunLoopSourceWakeUpRunLoops(_ source: CFRunLoopSource) -> Void | |
| enum CoreFoundationNamespace { | |
| struct symbols:BitwiseCopyable { | |
| let retain:CFTreeRetainCallBack | |
| let release:CFTreeReleaseCallBack | |
| let hash:CFSetHashCallBack | |
| let equal:CFArrayEqualCallBack | |
| let copyDescription:CFBagCopyDescriptionCallBack | |
| let _CFRunLoopGet2:@convention(c) (CFRunLoop) -> RunLoop | |
| // let _CFFireTimer:CFRunLoopTimerCallBack | |
| // let _CFRunLoopSourceWakeUpRunLoops:@convention(c) (CFRunLoopSource) -> Void | |
| private init() { | |
| do { | |
| let cfBundle = CFBundleGetBundleWithIdentifier("com.apple.CoreFoundation" as CFString)! | |
| let symbols = [ | |
| "CFRetain", | |
| "CFRelease", | |
| "CFHash", | |
| "CFEqual", | |
| "CFCopyDescription", | |
| "_CFRunLoopGet2", | |
| // "_CFRunLoopSourceWakeUpRunLoops", | |
| // "_CFFireTimer" | |
| ] | |
| var list = ContiguousArray<UnsafeMutableRawPointer?>.init(repeating: nil, count: symbols.count) | |
| list.withUnsafeMutableBufferPointer { | |
| CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!) | |
| } | |
| retain = unsafeBitCast(list[0], to: CFTreeRetainCallBack.self) | |
| release = unsafeBitCast(list[1], to: CFTreeReleaseCallBack.self) | |
| hash = unsafeBitCast(list[2], to: CFSetHashCallBack.self) | |
| equal = unsafeBitCast(list[3], to: CFArrayEqualCallBack.self) | |
| copyDescription = unsafeBitCast(list[4], to: CFBagCopyDescriptionCallBack.self) | |
| _CFRunLoopGet2 = unsafeBitCast(list[5], to: (@convention(c) (CFRunLoop) -> RunLoop).self) | |
| // __CFFireTimer = unsafeBitCast(list[6], to: CFRunLoopTimerCallBack.self) | |
| // _CFRunLoopSourceWakeUpRunLoops = unsafeBitCast(list[6], to: (@convention(c) (CFRunLoopSource) -> Void).self) | |
| } | |
| } | |
| static func create() -> Self { | |
| autoreleasepool { | |
| .init() | |
| } | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import Synchronization | |
| final class CurrentRunLoopExecutor: SerialExecutor, TaskExecutor { | |
| // RunLoopExecutor, SchedulingExecutor | |
| private static func makeBinaryHeap() -> CFBinaryHeap { | |
| var heapCb = kCFStringBinaryHeapCallBacks | |
| heapCb.retain = kCFTypeDictionaryKeyCallBacks.retain | |
| heapCb.release = kCFTypeDictionaryKeyCallBacks.release | |
| heapCb.copyDescription = kCFTypeDictionaryKeyCallBacks.copyDescription | |
| heapCb.compare = { | |
| // reverse order | |
| let lhs = Unmanaged<AnyObject>.fromOpaque($0!).takeUnretainedValue() as! UnownedJob | |
| let rhs = Unmanaged<AnyObject>.fromOpaque($1!).takeUnretainedValue() as! UnownedJob | |
| let _ = $2 | |
| if rhs.priority < lhs.priority { | |
| return .compareLessThan | |
| } | |
| if rhs.priority > lhs.priority { | |
| return .compareGreaterThan | |
| } | |
| return .compareEqualTo | |
| } | |
| return CFBinaryHeapCreate(nil, 0, &heapCb, nil)! | |
| } | |
| class RunLoopContextStore { | |
| var heap:CFBinaryHeap = CurrentRunLoopExecutor.makeBinaryHeap() | |
| let lock = NSRecursiveLock() | |
| var serial:UnownedSerialExecutor! | |
| var task:UnownedTaskExecutor? | |
| var enterCount = 0 | |
| deinit { | |
| precondition(CFBinaryHeapGetCount(heap) == 0, "leak of unprocessed tasks") | |
| } | |
| @objc func repeatedBlock(_ timer:Timer) { | |
| let info = unsafeDowncast(timer.userInfo as AnyObject, to: RepeatInfo.self) | |
| if info.boolean.load(ordering: .acquiring) { | |
| info.block() | |
| } else { | |
| // counter.subtract(1, ordering: .relaxed) | |
| timer.invalidate() | |
| } | |
| } | |
| @objc func perform( timer:Timer) { | |
| let block = timer.userInfo.flatMap { | |
| unsafeBitCast($0 as AnyObject, to: (@convention(block) () -> Void).self) | |
| } | |
| if let block { | |
| block() | |
| // counter.subtract(1, ordering: .relaxed) | |
| } else { | |
| // isActive = false | |
| // CFRunLoopStop(CFRunLoopGetCurrent()) | |
| } | |
| } | |
| } | |
| struct RunningInfo { | |
| var task:UnownedTaskExecutor? | |
| var serial:UnownedSerialExecutor | |
| } | |
| @TaskLocal | |
| private static var _current:Unmanaged<CurrentRunLoopExecutor>? | |
| nonisolated(unsafe) | |
| private let runLoop = RunLoop.current | |
| let _isMainExecutor:Bool | |
| nonisolated(unsafe) | |
| private let store:RunLoopContextStore | |
| nonisolated(unsafe) | |
| private let source:CFRunLoopSource = { | |
| let sym = CoreFoundationNamespace.symbols.create() | |
| var source = CFRunLoopSourceContext() | |
| source.retain = sym.retain | |
| source.release = sym.release | |
| source.copyDescription = sym.copyDescription | |
| source.hash = sym.hash | |
| source.equal = sym.equal | |
| source.perform = { | |
| let store = Unmanaged<RunLoopContextStore>.fromOpaque($0!).takeUnretainedValue() | |
| var newHeap = CurrentRunLoopExecutor.makeBinaryHeap() | |
| var info = RunningInfo.init(task: store.task, serial: store.serial) | |
| store.lock.withLock { | |
| swap(&store.heap, &newHeap) | |
| } | |
| let functionCall = unsafeBitCast(CFBinaryHeapGetMinimumIfPresent, to: ((CFBinaryHeap, AutoreleasingUnsafeMutablePointer<AnyObject?>) -> Bool).self) | |
| weak var value:AnyObject? | |
| let taskExecutor = store.task | |
| let serialExecutor = store.serial! | |
| while functionCall(newHeap, &value), let job = value.flatMap({ ($0 as! UnownedJob) }) { | |
| value = nil | |
| CFBinaryHeapRemoveMinimumValue(newHeap) | |
| if let taskExecutor { | |
| job.runSynchronously(isolatedTo: serialExecutor, taskExecutor: taskExecutor) | |
| } else { | |
| job.runSynchronously(on: serialExecutor) | |
| } | |
| } | |
| } | |
| let buffer = RunLoopContextStore() | |
| source.info = Unmanaged.passUnretained(buffer).toOpaque() | |
| return CFRunLoopSourceCreate(nil, 0, &source) | |
| }() | |
| class Invoker:NSObject { | |
| private(set) var invoked = false | |
| var storage:CurrentRunLoopExecutor? | |
| @objc func invoke() { | |
| invoked = true | |
| storage = CurrentRunLoopExecutor._current?.takeUnretainedValue() | |
| } | |
| } | |
| private static func peekLocal() -> CurrentRunLoopExecutor? { | |
| let invoker = Invoker() | |
| invoker.perform(#selector(invoker.invoke), on: Thread.current, with: nil, waitUntilDone: false) | |
| while !invoker.invoked { | |
| let future = CFDateGetAbsoluteTime(Date.distantFuture as CFDate) | |
| CFRunLoopRunInMode(.defaultMode, future, true) | |
| } | |
| if let ref = invoker.storage { | |
| return ref | |
| } | |
| return nil | |
| } | |
| internal static func currentExecutor() -> CurrentRunLoopExecutor { | |
| if Thread.isMainThread { | |
| // we are safe | |
| return CurrentRunLoopExecutor() | |
| } else if let t = _current?.takeUnretainedValue(), t.runLoop == .current { | |
| // current RunLooperRunner === currentRunLoop | |
| // this is also safe | |
| return t | |
| } else if RunLoop.current.currentMode != nil, let local = peekLocal() { | |
| return local | |
| } else { | |
| /* | |
| 1. RunLoop is active but does not have `CurrentRunLoopExecutor` in nested loop | |
| 2. RunLoop is inactive -> which should always create new one | |
| */ | |
| return CurrentRunLoopExecutor() | |
| } | |
| } | |
| private init() { | |
| var context = CFRunLoopSourceContext() | |
| CFRunLoopSourceGetContext(source, &context) | |
| store = Unmanaged.fromOpaque(context.info).takeUnretainedValue() | |
| _isMainExecutor = runLoop === RunLoop.main | |
| store.serial = asUnownedSerialExecutor() | |
| store.task = asUnownedTaskExecutor() | |
| } | |
| private func transferJob(to current:CurrentRunLoopExecutor) { | |
| var newHeap = Self.makeBinaryHeap() | |
| self.store.lock.withLock { | |
| swap(&self.store.heap, &newHeap) | |
| } | |
| current.store.lock.withLock { | |
| CFBinaryHeapApplyFunction(newHeap, { | |
| CFBinaryHeapAddValue(Unmanaged<CFBinaryHeap>.fromOpaque($1!).takeUnretainedValue(), $0) | |
| }, Unmanaged.passUnretained(current.store.heap).toOpaque()) | |
| } | |
| CFRunLoopSourceSignal(current.source) | |
| } | |
| public func run() throws { | |
| guard runLoop === RunLoop.current else { | |
| throw CancellationError() | |
| } | |
| let shouldRemoveSource:Bool | |
| if !CFRunLoopContainsSource(runLoop.getCFRunLoop(), source, .defaultMode) { | |
| if runLoop.currentMode != nil, let t = Self.peekLocal(), t !== self { | |
| // already installed runloop detected | |
| transferJob(to: t) | |
| throw CancellationError() | |
| } | |
| CFRunLoopAddSource(runLoop.getCFRunLoop(), source, .defaultMode) | |
| shouldRemoveSource = true | |
| } else { | |
| shouldRemoveSource = false | |
| } | |
| defer { | |
| if shouldRemoveSource { | |
| CFRunLoopRemoveSource(runLoop.getCFRunLoop(), source, .defaultMode) | |
| } | |
| } | |
| do { | |
| store.enterCount += 1 | |
| defer { store.enterCount -= 1} | |
| CurrentRunLoopExecutor.$_current.withValue(.passUnretained(self)) { | |
| let future = CFDateGetAbsoluteTime(Date.distantFuture as CFDate) | |
| while CFRunLoopRunInMode(.defaultMode, future, false) == .timedOut { | |
| } | |
| } | |
| } | |
| } | |
| public func runUntil(_ condition: () -> Bool) throws { | |
| guard runLoop === RunLoop.current else { | |
| throw CancellationError() | |
| } | |
| let shouldRemoveSource:Bool | |
| if !CFRunLoopContainsSource(runLoop.getCFRunLoop(), source, .defaultMode) { | |
| if runLoop.currentMode != nil, let t = Self.peekLocal(), t !== self { | |
| // already installed runloop detected | |
| transferJob(to: t) | |
| throw CancellationError() | |
| } | |
| CFRunLoopAddSource(runLoop.getCFRunLoop(), source, .defaultMode) | |
| shouldRemoveSource = true | |
| } else { | |
| shouldRemoveSource = false | |
| } | |
| defer { | |
| if shouldRemoveSource { | |
| CFRunLoopRemoveSource(runLoop.getCFRunLoop(), source, .defaultMode) | |
| } | |
| } | |
| do { | |
| store.enterCount += 1 | |
| defer { | |
| store.enterCount -= 1 | |
| } | |
| while !condition() { | |
| CurrentRunLoopExecutor.$_current.withValue(.passUnretained(self)) { | |
| RunLoop.current.run(mode: .default, before: .distantFuture) | |
| } | |
| } | |
| } | |
| } | |
| public func stop() { | |
| CFRunLoopStop(runLoop.getCFRunLoop()) | |
| } | |
| func checkIsolated() { | |
| precondition(runLoop === RunLoop.current) | |
| } | |
| func isIsolatingCurrentContext() -> Bool? { | |
| return runLoop === RunLoop.current | |
| } | |
| func asUnownedSerialExecutor() -> UnownedSerialExecutor { | |
| UnownedSerialExecutor(complexEquality: self) | |
| } | |
| func isSameExclusiveExecutionContext(other: CurrentRunLoopExecutor) -> Bool { | |
| runLoop === other.runLoop | |
| } | |
| func enqueue(_ job: consuming ExecutorJob) { | |
| if _isMainExecutor { | |
| MainActor.shared.enqueue(UnownedJob(job)) | |
| } else { | |
| let job = UnownedJob(job) | |
| let ref = job as AnyObject | |
| self.store.lock.withLock { | |
| let t = Unmanaged.passUnretained(ref).toOpaque() | |
| CFBinaryHeapAddValue(self.store.heap, t) | |
| } | |
| CFRunLoopSourceSignal(source) | |
| CFRunLoopWakeUp(runLoop.getCFRunLoop()) | |
| // let ref = asUnownedSerialExecutor() | |
| // runLoop.perform { | |
| // job.runSynchronously(on: ref) | |
| // } | |
| } | |
| } | |
| public var isMainExecutor: Bool { false } | |
| // var asScheduling: (any SchedulingExecutor)? { | |
| // return self | |
| // } | |
| // | |
| @available(*,unavailable) | |
| public func enqueue<C: Clock>(_ job: consuming ExecutorJob, | |
| at instant: C.Instant, | |
| tolerance: C.Duration? = nil, | |
| clock: C) | |
| { | |
| if let continuous = clock as? ContinuousClock { | |
| let i = (instant as! ContinuousClock.Instant) | |
| continuous.now.duration(to: i) | |
| } else if let suspending = clock as? SuspendingClock { | |
| (instant as! SuspendingClock.Instant) | |
| } else { | |
| clock | |
| } | |
| // _dispatchEnqueue(job, at: instant, tolerance: tolerance, clock: clock, | |
| // executor: self, global: false) | |
| } | |
| } | |
| import Combine | |
| extension CurrentRunLoopExecutor: Combine.Scheduler { | |
| typealias SchedulerOptions = RunLoop.SchedulerOptions | |
| typealias SchedulerTimeType = RunLoop.SchedulerTimeType | |
| func schedule(options: SchedulerOptions?, _ action: @escaping () -> Void) { | |
| if _isMainExecutor { | |
| return runLoop.schedule(options: options, action) | |
| } | |
| self.runLoop.schedule(options: options) { | |
| action() | |
| } | |
| } | |
| func schedule( | |
| after date: SchedulerTimeType, | |
| tolerance: SchedulerTimeType.Stride, | |
| options: SchedulerOptions?, | |
| _ action: @escaping () -> Void | |
| ) { | |
| if _isMainExecutor { | |
| return runLoop.schedule(after: date, tolerance: tolerance, options: options, action) | |
| } | |
| let block:@convention(block) () -> Void = action | |
| let timer = Timer(fireAt: date.date, interval: 0, target: self.store, selector: #selector(RunLoopContextStore.perform(timer:)), userInfo: block, repeats: false) | |
| runLoop.add(timer, forMode: .common) | |
| } | |
| func schedule( | |
| after date: SchedulerTimeType, | |
| interval: SchedulerTimeType.Stride, | |
| tolerance: SchedulerTimeType.Stride, | |
| options: SchedulerOptions?, | |
| _ action: @escaping () -> Void | |
| ) -> any Cancellable { | |
| if _isMainExecutor { | |
| return runLoop.schedule(after: date, interval: interval, tolerance: tolerance, options: options, action) | |
| } | |
| let info = RepeatInfo(block: action) | |
| let timer = Timer(fireAt: date.date, interval: interval.timeInterval, target: self.store, selector: #selector(RunLoopContextStore.repeatedBlock(_:)), userInfo: info, repeats: true) | |
| info.timer = timer | |
| self.runLoop.add(timer, forMode: .common) | |
| return AnyCancellable(info) | |
| } | |
| var minimumTolerance: SchedulerTimeType.Stride { | |
| runLoop.minimumTolerance | |
| } | |
| var now: SchedulerTimeType { | |
| runLoop.now | |
| } | |
| class RepeatInfo:Combine.Cancellable { | |
| let boolean = Atomic(true) | |
| weak var timer:Timer? | |
| let block:() -> Void | |
| init(block: @escaping () -> Void) { | |
| self.block = block | |
| } | |
| func cancel() { | |
| if boolean.compareExchange(expected: true, desired: false, ordering: .acquiringAndReleasing).exchanged { | |
| timer?.fireDate = .init() | |
| } | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // | |
| // RunLoopGCDExecutor3.swift | |
| // Sample | |
| // | |
| // Created by 박병관 on 2/6/26. | |
| // | |
| import Foundation | |
| import Dispatch | |
| #if canImport(Darwin) && canImport(ObjectiveC) | |
| import Darwin | |
| import ObjectiveC | |
| import Synchronization | |
| actor RunLoopGCDExecutor { | |
| nonisolated(unsafe) | |
| unowned let runLoop = RunLoop.current | |
| nonisolated(unsafe) | |
| private let threadId = pthread_self() | |
| nonisolated | |
| private let queue:DispatchSerialQueue | |
| nonisolated let wrappedQueue:DispatchSerialQueue | |
| private let isMainThread:Bool = Thread.isMainThread | |
| nonisolated var unownedExecutor: UnownedSerialExecutor { | |
| asUnownedSerialExecutor() | |
| } | |
| class Delegate: NSObject, NSMachPortDelegate { | |
| var didNested = false | |
| var isRunning = false | |
| let atomicFlag = Atomic<Int>(0) | |
| unowned let parent:RunLoopGCDExecutor | |
| init(_ parent:RunLoopGCDExecutor) { | |
| self.parent = parent | |
| } | |
| func handleMachMessage(_ msg: UnsafeMutableRawPointer) { | |
| let k = msg.assumingMemoryBound(to: mach_msg_header_t.self) | |
| if isRunning { | |
| didNested = true | |
| return | |
| } | |
| isRunning = true | |
| defer { | |
| isRunning = false | |
| } | |
| while Namespace.perform(parent.queue) == true { | |
| } | |
| if atomicFlag.load(ordering: .acquiring) == 1 { | |
| if atomicFlag.compareExchange(expected: 1, desired: 2, ordering: .acquiringAndReleasing).exchanged { | |
| CFRunLoopStop(CFRunLoopGetCurrent()) | |
| } | |
| } | |
| } | |
| } | |
| private enum Namespace { | |
| @_silgen_name("_dispatch_runloop_root_queue_create_4CF") | |
| private static func _create_queue( | |
| _ label:UnsafePointer<Int8>?, | |
| _ flags:UInt32 | |
| ) -> Unmanaged<DispatchSerialQueue> | |
| public static func create_root_queue( | |
| _ label:String? | |
| ) -> DispatchSerialQueue { | |
| _create_queue(label, 0).takeRetainedValue() | |
| } | |
| @_silgen_name("_dispatch_runloop_root_queue_get_port_4CF") | |
| public static func get_port( | |
| _ queue:DispatchSerialQueue | |
| ) -> mach_port_t | |
| @_silgen_name("_dispatch_runloop_root_queue_perform_4CF") | |
| public static func perform( | |
| _ queue:DispatchSerialQueue | |
| ) -> CBool | |
| @_silgen_name("_dispatch_runloop_root_queue_wakeup_4CF") | |
| public static func wakeup( | |
| _ queue:DispatchSerialQueue | |
| ) | |
| @_silgen_name("dispatch_block_create_with_qos_class") | |
| private static func __dispatch_block_create_with_qos_class( | |
| _ flags:__dispatch_block_flags_t, | |
| _ qos_class:qos_class_t, | |
| _ relative_priority:Int32, | |
| _ block:@convention(block) () -> Void | |
| ) -> Unmanaged<AnyObject> | |
| public static func _dispatch_block_create_with_qos_class( | |
| _ flags:DispatchWorkItemFlags, | |
| _ qos:DispatchQoS, | |
| _ block:@convention(block) () -> Void | |
| ) -> (@convention(block) () -> Void) { | |
| let ref = __dispatch_block_create_with_qos_class(.init(rawValue: flags.rawValue), qos.qosClass.rawValue, Int32(qos.relativePriority), block).takeRetainedValue() | |
| return unsafeBitCast(ref, to: (@convention(block) () -> Void).self) | |
| } | |
| public static let _key:DispatchSpecificKey<Unmanaged<RunLoopGCDExecutor>> = .init() | |
| @TaskLocal public static var _current:Unmanaged<RunLoopGCDExecutor>? = nil | |
| } | |
| private init() { | |
| let root = Namespace.create_root_queue(nil) | |
| self.queue = root | |
| wrappedQueue = DispatchSerialQueue(label: "RunLoopGCDExecutor", attributes: [.initiallyInactive], autoreleaseFrequency: .workItem, target: root) | |
| } | |
| deinit { | |
| // DispatchSerialQueue.init(label: "", attributes: [.initiallyInactive]) | |
| wrappedQueue.activate() | |
| } | |
| nonisolated | |
| public static func current() -> RunLoopGCDExecutor { | |
| if let t = DispatchQueue.getSpecific(key: Namespace._key)?.takeUnretainedValue() { | |
| return t | |
| } | |
| if let t = Namespace._current?.takeUnretainedValue(), t.runLoop === RunLoop.current { | |
| return t | |
| } | |
| return .init() | |
| } | |
| nonisolated | |
| public func stop() { | |
| guard | |
| let rootQueue = wrappedQueue.getSpecific(key: Namespace._key)?.takeUnretainedValue().queue, | |
| let delegate = NSMachPort.port(withMachPort: Namespace.get_port(rootQueue), options: []).delegate() as? Delegate else { | |
| return | |
| } | |
| if delegate.atomicFlag.compareExchange(expected: 0, desired: 1, ordering: .acquiringAndReleasing).exchanged { | |
| wrappedQueue.async { | |
| } | |
| } | |
| // delegate.parent | |
| } | |
| public func run() throws { | |
| guard withUnsafeCurrentTask(body: { $0 == nil}) else { | |
| throw CancellationError() | |
| } | |
| guard threadId == pthread_self() else { | |
| throw CancellationError() | |
| } | |
| do { | |
| if let ref = Namespace._current { | |
| guard ref.toOpaque() == Unmanaged.passUnretained(self).toOpaque() else { | |
| if let t = Namespace._current?.takeUnretainedValue() { | |
| self.wrappedQueue.setTarget(queue: t.wrappedQueue) | |
| self.wrappedQueue.activate() | |
| } | |
| throw CancellationError() | |
| } | |
| } | |
| } | |
| let pinned = queue | |
| let port = NSMachPort.port(withMachPort: Namespace.get_port(pinned), options: []) | |
| let nested = port.delegate() != nil | |
| let delegate = port.delegate() as! Delegate? ?? { | |
| let t = Delegate.init(self) | |
| port.setDelegate(t) | |
| RunLoop.current.add(port, forMode: .common) | |
| // port.schedule(in: .current, forMode: .common) | |
| return t | |
| }() | |
| wrappedQueue.setSpecific(key: Namespace._key, value: .passUnretained(self)) | |
| let observer:CFRunLoopObserver? = if !nested { | |
| CFRunLoopObserverCreateWithHandler(nil, ([.exit, .beforeWaiting] as CFRunLoopActivity).rawValue, true, 0) { [delegate] _, activity in | |
| if delegate.didNested, !delegate.isRunning { | |
| delegate.didNested = false | |
| delegate.parent.wrappedQueue.async {} | |
| } | |
| } | |
| } else { | |
| nil | |
| } | |
| if let observer { | |
| CFRunLoopAddObserver(CFRunLoopGetCurrent(), observer, .commonModes) | |
| } | |
| defer { | |
| observer.flatMap(CFRunLoopObserverInvalidate) | |
| delegate.atomicFlag.store(0, ordering: .releasing) | |
| if !nested { | |
| RunLoop.current.remove(port, forMode: .common) | |
| wrappedQueue.setSpecific(key: Namespace._key, value: nil) | |
| } | |
| } | |
| Namespace.$_current.withValue(Unmanaged.passUnretained(self)) { | |
| wrappedQueue.activate() | |
| while delegate.atomicFlag.load(ordering: .acquiring) != 2 { | |
| CFRunLoopRunInMode(.defaultMode, Date.distantFuture.timeIntervalSinceNow, false) | |
| } | |
| } | |
| } | |
| public func runUntil(_ condition: () -> Bool) throws { | |
| guard withUnsafeCurrentTask(body: { $0 == nil}) else { | |
| throw CancellationError() | |
| } | |
| guard threadId == pthread_self() else { | |
| throw CancellationError() | |
| } | |
| do { | |
| if let ref = Namespace._current { | |
| guard ref.toOpaque() == Unmanaged.passUnretained(self).toOpaque() else { | |
| if let t = Namespace._current?.takeUnretainedValue() { | |
| self.wrappedQueue.setTarget(queue: t.wrappedQueue) | |
| self.wrappedQueue.activate() | |
| } | |
| throw CancellationError() | |
| } | |
| } | |
| } | |
| let pinned = queue | |
| let port = NSMachPort.port(withMachPort: Namespace.get_port(pinned), options: []) | |
| let nested = port.delegate() != nil | |
| let delegate = port.delegate() as! Delegate? ?? { | |
| let t = Delegate(self) | |
| port.setDelegate(t) | |
| RunLoop.current.add(port, forMode: .common) | |
| return t | |
| }() | |
| wrappedQueue.setSpecific(key: Namespace._key, value: .passUnretained(self)) | |
| let observer:CFRunLoopObserver? = if !nested { | |
| CFRunLoopObserverCreateWithHandler(nil, ([.exit, .beforeWaiting] as CFRunLoopActivity).rawValue, true, 0) { [delegate] _, activity in | |
| if delegate.didNested, !delegate.isRunning { | |
| delegate.didNested = false | |
| Namespace.wakeup(delegate.parent.queue) | |
| } | |
| } | |
| } else { | |
| nil | |
| } | |
| if let observer { | |
| CFRunLoopAddObserver(CFRunLoopGetCurrent(), observer, .commonModes) | |
| } | |
| defer { | |
| observer.flatMap(CFRunLoopObserverInvalidate) | |
| delegate.atomicFlag.store(0, ordering: .releasing) | |
| if !nested { | |
| RunLoop.current.remove(port, forMode: .common) | |
| wrappedQueue.setSpecific(key: Namespace._key, value: nil) | |
| } | |
| } | |
| try Namespace.$_current.withValue(Unmanaged.passUnretained(self)) { | |
| wrappedQueue.activate() | |
| repeat { | |
| if !runLoop.run(mode: .default, before: .distantFuture) { | |
| throw CancellationError() | |
| } | |
| } while !condition() | |
| } | |
| } | |
| } | |
| extension RunLoopGCDExecutor:SerialExecutor { | |
| nonisolated | |
| func enqueue(_ job: consuming ExecutorJob) { | |
| if isMainThread { | |
| MainActor.shared.enqueue(.init(job)) | |
| return | |
| } | |
| wrappedQueue.enqueue(job) | |
| } | |
| nonisolated | |
| func isSameExclusiveExecutionContext(other: RunLoopGCDExecutor) -> Bool { | |
| return runLoop.isEqual(other.runLoop) | |
| } | |
| nonisolated | |
| func isIsolatingCurrentContext() -> Bool? { | |
| threadId == pthread_self() | |
| } | |
| nonisolated | |
| func checkIsolated() { | |
| precondition(threadId == pthread_self()) | |
| } | |
| nonisolated | |
| func asUnownedSerialExecutor() -> UnownedSerialExecutor { | |
| if isMainThread { | |
| return MainActor.sharedUnownedExecutor | |
| } | |
| return .init(complexEquality: self) | |
| } | |
| } | |
| @available(*,unavailable) | |
| extension RunLoopGCDExecutor { | |
| nonisolated | |
| public func enqueue<C: Clock>(_ job: consuming ExecutorJob, | |
| after delay: C.Duration, | |
| tolerance: C.Duration? = nil, | |
| clock: C) { | |
| // wrappedQueue.enqueue(job, after: delay, tolerance: tolerance, clock: clock) | |
| switch clock { | |
| case let continuous as ContinuousClock: | |
| let (sec, att) = (delay as! ContinuousClock.Duration).components | |
| let DISPATCH_UP_OR_MONOTONIC_TIME_MASK:UInt64 = (1 << 63) | |
| let raw_time = Dispatch.__dispatch_time(DISPATCH_UP_OR_MONOTONIC_TIME_MASK, sec * Int64(bitPattern: Dispatch.NSEC_PER_SEC) + Int64(att / 1_000_000_000)) | |
| if let tol = tolerance as! ContinuousClock.Duration? { | |
| let jobRef = UnownedJob(job) | |
| let source = DispatchSource.makeTimerSource(queue: wrappedQueue) | |
| let (sec, atto) = tol.components | |
| source.setEventHandler( | |
| qos: .init(qosClass: .init(rawValue: .init(UInt32(jobRef.priority.rawValue)))!, relativePriority: 0), | |
| flags: [.enforceQoS] | |
| ) { [unowned(unsafe) source, unowned wrappedQueue] in | |
| source.cancel() | |
| wrappedQueue.enqueue(jobRef) | |
| } | |
| Dispatch.__dispatch_source_set_timer(source as! DispatchSource, raw_time, Dispatch.DispatchTime.distantFuture.rawValue, UInt64(sec) * Dispatch.NSEC_PER_SEC + UInt64(atto / 1_000_000_000)) | |
| source.activate() | |
| } else { | |
| let jobRef = UnownedJob(job) | |
| let dispatch_block = Namespace._dispatch_block_create_with_qos_class( | |
| [.enforceQoS], | |
| .init(qosClass: .init(rawValue: .init(UInt32(jobRef.priority.rawValue)))!, relativePriority: 0), | |
| {[unowned wrappedQueue] in | |
| wrappedQueue.enqueue(jobRef) | |
| } | |
| ) | |
| Dispatch.__dispatch_after( | |
| raw_time, | |
| wrappedQueue, | |
| dispatch_block | |
| ) | |
| } | |
| case let suspending as SuspendingClock: | |
| let (sec, att) = (delay as! SuspendingClock.Duration).components | |
| let dispatchTime = DispatchTime.now().advanced(by: .seconds(Int(sec))).advanced(by: .nanoseconds(Int(att / 1_000_000_000))) | |
| if let tol = tolerance as! SuspendingClock.Duration? { | |
| let jobRef = UnownedJob(job) | |
| let source = DispatchSource.makeTimerSource(queue: wrappedQueue) | |
| let (sec, atto) = tol.components | |
| source.setEventHandler( | |
| qos: .init(qosClass: .init(rawValue: .init(UInt32(jobRef.priority.rawValue)))!, relativePriority: 0), | |
| flags: [.enforceQoS] | |
| ) { [unowned(unsafe) source, unowned wrappedQueue] in | |
| source.cancel() | |
| wrappedQueue.enqueue(jobRef) | |
| } | |
| let leeway: DispatchTimeInterval = if sec != 0 { | |
| .seconds(Int(sec)) | |
| } else { | |
| .nanoseconds(Int(atto / 1_000_000_000)) | |
| } | |
| source.schedule(deadline: dispatchTime, repeating: .never, leeway: leeway) | |
| source.activate() | |
| } else { | |
| let jobRef = UnownedJob(job) | |
| wrappedQueue.asyncAfter(deadline: dispatchTime, | |
| qos: .init(qosClass: .init(rawValue: .init(UInt32(jobRef.priority.rawValue)))!, relativePriority: 0), | |
| flags: [.enforceQoS] | |
| ) { [unowned wrappedQueue] in | |
| wrappedQueue.enqueue(jobRef) | |
| } | |
| } | |
| default: | |
| fatalError("unavailable") | |
| } | |
| } | |
| nonisolated | |
| public func enqueue<C: Clock>(_ job: consuming ExecutorJob, | |
| at instant: C.Instant, | |
| tolerance: C.Duration? = nil, | |
| clock: C) | |
| { | |
| enqueue(job, after: clock.now.duration(to: instant), | |
| tolerance: tolerance, clock: clock) | |
| } | |
| } | |
| import Combine | |
| extension RunLoopGCDExecutor:Combine.Scheduler { | |
| typealias SchedulerOptions = RunLoop.SchedulerOptions | |
| typealias SchedulerTimeType = RunLoop.SchedulerTimeType | |
| nonisolated func schedule(options: SchedulerOptions?, _ action: @escaping () -> Void) { | |
| runLoop.schedule(options: options, action) | |
| } | |
| nonisolated func schedule(after date: SchedulerTimeType, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions?, _ action: @escaping () -> Void) { | |
| runLoop.schedule(after: date, tolerance: tolerance, options: options, action) | |
| } | |
| nonisolated func schedule(after date: SchedulerTimeType, interval: SchedulerTimeType.Stride, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions?, _ action: @escaping () -> Void) -> any Cancellable { | |
| runLoop.schedule(after: date, interval: interval, tolerance: tolerance, options: options, action) | |
| } | |
| nonisolated var minimumTolerance: SchedulerTimeType.Stride { | |
| runLoop.minimumTolerance | |
| } | |
| nonisolated var now: SchedulerTimeType { | |
| runLoop.now | |
| } | |
| } | |
| #else | |
| @available(*, unavailable) | |
| final class RunLoopGCDExecutor { | |
| } | |
| #endif // canImport(Darwin) && canImport(ObjectiveC) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment