Skip to content

Instantly share code, notes, and snippets.

@pbk20191
Last active December 24, 2025 07:42
Show Gist options
  • Select an option

  • Save pbk20191/84e1164b4e21fc994ac622a8172616ff to your computer and use it in GitHub Desktop.

Select an option

Save pbk20191/84e1164b4e21fc994ac622a8172616ff to your computer and use it in GitHub Desktop.
CFRunLoop derived DispatchSerialQueue
import Foundation
import Dispatch
class Receiver: NSObject, NSMachPortDelegate {
weak var ref:DispatchQueue?
weak var runLoop:CFRunLoop?
let perform:@convention(c) (DispatchQueue) -> DarwinBoolean
func handleMachMessage(_ msg: UnsafeMutableRawPointer) {
guard let ref else { return }
while autoreleasepool(invoking: {
perform(ref)
}) == true {}
}
init( perform: @convention(c) (DispatchQueue) -> DarwinBoolean) {
self.perform = perform
}
@objc func run(_ port:NSMachPort) {
autoreleasepool {
self.runLoop = CFRunLoopGetCurrent()
port.setDelegate(self)
RunLoop.current.add(port, forMode: .common)
}
autoreleasepool {
while port.isValid, RunLoop.current.run(mode: .default, before: .distantFuture) {
}
}
}
}
import Combine
enum LibDispatchNameSpace {
var symbols: symbols { .create() }
struct symbols:BitwiseCopyable {
typealias _dispatch_runloop_root_queue_create_4CF_callback = @convention(c) (UnsafePointer<CChar>?, CFOptionFlags) -> Unmanaged<dispatch_queue_serial_t>
typealias _dispatch_runloop_root_queue_perform_4CF_callback = @convention(c) (dispatch_queue_t) -> DarwinBoolean
typealias _dispatch_runloop_root_queue_get_port_4CF_callback = @convention(c) (dispatch_queue_t) -> mach_port_t
typealias dispatch_set_context_callback = @convention(c) (dispatch_object_t, UnsafeMutableRawPointer?) -> Void
typealias dispatch_get_context_callback = @convention(c) (dispatch_object_t) -> UnsafeMutableRawPointer?
typealias dispatch_set_finalizer_f_callback = @convention(c) (dispatch_object_t, CFTreeReleaseCallBack?) -> Void
let _dispatch_runloop_root_queue_create_4CF:_dispatch_runloop_root_queue_create_4CF_callback
let _dispatch_runloop_root_queue_perform_4CF:_dispatch_runloop_root_queue_perform_4CF_callback
let _dispatch_runloop_root_queue_get_port_4CF:_dispatch_runloop_root_queue_get_port_4CF_callback
let dispatch_set_context:dispatch_set_context_callback
let dispatch_get_context:dispatch_get_context_callback
let dispatch_set_finalizer_f:dispatch_set_finalizer_f_callback
private init() {
do {
//_dispatch_continuation_redirect_push
Bundle(for: NSClassFromString("OS_dispatch_queue")!).load()
// NSStringFromClass(<#T##aClass: AnyClass##AnyClass#>)
// let cfBundle = CFBundleCreate(nil, URL(string: "/usr/lib/system/introspection")! as CFURL)
let symbols = [
"_dispatch_runloop_root_queue_create_4CF",
"_dispatch_runloop_root_queue_perform_4CF",
"_dispatch_runloop_root_queue_get_port_4CF",
"dispatch_set_context",
"dispatch_get_context",
"dispatch_set_finalizer_f",
"_dispatch_lane_invoke",
"_dispatch_lane_push"
]
// DispatchSource.makeMachSendSource(port: <#T##mach_port_t#>, eventMask: <#T##DispatchSource.MachSendEvent#>)
var list = ContiguousArray<UnsafeMutableRawPointer?>()
for i in symbols {
i.withCString {
list.append(dlsym(.init(bitPattern: -2)!, $0))
}
}
// list.withUnsafeMutableBufferPointer {
// dlsym(.init(bitPattern: -2)!, <#T##__symbol: UnsafePointer<CChar>!##UnsafePointer<CChar>!#>)
// CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!)
//
// }
_dispatch_runloop_root_queue_create_4CF = unsafeBitCast(list[0], to: _dispatch_runloop_root_queue_create_4CF_callback.self)
_dispatch_runloop_root_queue_perform_4CF = unsafeBitCast(list[1], to: _dispatch_runloop_root_queue_perform_4CF_callback.self)
_dispatch_runloop_root_queue_get_port_4CF = unsafeBitCast(list[2], to: _dispatch_runloop_root_queue_get_port_4CF_callback.self)
dispatch_set_context = unsafeBitCast(list[3], to: dispatch_set_context_callback.self)
dispatch_get_context = unsafeBitCast(list[4], to: dispatch_get_context_callback.self)
dispatch_set_finalizer_f = unsafeBitCast(list[5], to: dispatch_set_finalizer_f_callback.self)
}
}
static func create() -> Self {
autoreleasepool {
.init()
}
}
nonisolated(unsafe)
static let key = DispatchSpecificKey<AnyCancellable>()
func runLoopDispatchQueue() -> DispatchSerialQueue {
let cfSym = CoreFoundationNamespace.symbols.create()
let runLoopQueue = _dispatch_runloop_root_queue_create_4CF(nil, 0).takeRetainedValue()
let mach = _dispatch_runloop_root_queue_get_port_4CF(runLoopQueue)
let cfPort = NSMachPort(machPort: mach, options: [])
let delegate = Receiver(perform: self._dispatch_runloop_root_queue_perform_4CF)
delegate.ref = runLoopQueue
cfPort.setDelegate(delegate)
objc_setAssociatedObject(runLoopQueue, Unmanaged.passUnretained(delegate).toOpaque(), AnyCancellable{
cfPort.invalidate()
delegate.runLoop.flatMap(CFRunLoopStop)
}, .OBJC_ASSOCIATION_RETAIN_NONATOMIC)
let th = Thread(target: delegate, selector: #selector(delegate.run), object: cfPort)
th.name = "looper"
th.start()
return runLoopQueue
}
}
}
//@_silgen_name("CFRunLoopSourceWakeUpRunLoops")
//func _CFRunLoopSourceWakeUpRunLoops(_ source: CFRunLoopSource) -> Void
enum CoreFoundationNamespace {
struct symbols:BitwiseCopyable {
let retain:CFTreeRetainCallBack
let release:CFTreeReleaseCallBack
let hash:CFSetHashCallBack
let equal:CFArrayEqualCallBack
let copyDescription:CFBagCopyDescriptionCallBack
let _CFRunLoopGet2:@convention(c) (CFRunLoop) -> RunLoop
// let _CFFireTimer:CFRunLoopTimerCallBack
// let _CFRunLoopSourceWakeUpRunLoops:@convention(c) (CFRunLoopSource) -> Void
private init() {
do {
let cfBundle = CFBundleGetBundleWithIdentifier("com.apple.CoreFoundation" as CFString)!
let symbols = [
"CFRetain",
"CFRelease",
"CFHash",
"CFEqual",
"CFCopyDescription",
"_CFRunLoopGet2",
// "_CFRunLoopSourceWakeUpRunLoops",
// "_CFFireTimer"
]
var list = ContiguousArray<UnsafeMutableRawPointer?>.init(repeating: nil, count: symbols.count)
list.withUnsafeMutableBufferPointer {
CFBundleGetFunctionPointersForNames(cfBundle, symbols as CFArray, $0.baseAddress!)
}
retain = unsafeBitCast(list[0], to: CFTreeRetainCallBack.self)
release = unsafeBitCast(list[1], to: CFTreeReleaseCallBack.self)
hash = unsafeBitCast(list[2], to: CFSetHashCallBack.self)
equal = unsafeBitCast(list[3], to: CFArrayEqualCallBack.self)
copyDescription = unsafeBitCast(list[4], to: CFBagCopyDescriptionCallBack.self)
_CFRunLoopGet2 = unsafeBitCast(list[5], to: (@convention(c) (CFRunLoop) -> RunLoop).self)
// __CFFireTimer = unsafeBitCast(list[6], to: CFRunLoopTimerCallBack.self)
// _CFRunLoopSourceWakeUpRunLoops = unsafeBitCast(list[6], to: (@convention(c) (CFRunLoopSource) -> Void).self)
}
}
static func create() -> Self {
autoreleasepool {
.init()
}
}
}
}
import Synchronization
final class CurrentRunLoopExecutor: SerialExecutor, TaskExecutor {
// RunLoopExecutor, SchedulingExecutor
private static func makeBinaryHeap() -> CFBinaryHeap {
var heapCb = kCFStringBinaryHeapCallBacks
heapCb.retain = kCFTypeDictionaryKeyCallBacks.retain
heapCb.release = kCFTypeDictionaryKeyCallBacks.release
heapCb.copyDescription = kCFTypeDictionaryKeyCallBacks.copyDescription
heapCb.compare = {
// reverse order
let lhs = Unmanaged<AnyObject>.fromOpaque($0!).takeUnretainedValue() as! UnownedJob
let rhs = Unmanaged<AnyObject>.fromOpaque($1!).takeUnretainedValue() as! UnownedJob
let _ = $2
if rhs.priority < lhs.priority {
return .compareLessThan
}
if rhs.priority > lhs.priority {
return .compareGreaterThan
}
return .compareEqualTo
}
return CFBinaryHeapCreate(nil, 0, &heapCb, nil)!
}
class RunLoopContextStore {
var heap:CFBinaryHeap = CurrentRunLoopExecutor.makeBinaryHeap()
let lock = NSRecursiveLock()
var serial:UnownedSerialExecutor!
var task:UnownedTaskExecutor?
var enterCount = 0
deinit {
precondition(CFBinaryHeapGetCount(heap) == 0, "leak of unprocessed tasks")
}
@objc func repeatedBlock(_ timer:Timer) {
let info = unsafeDowncast(timer.userInfo as AnyObject, to: RepeatInfo.self)
if info.boolean.load(ordering: .acquiring) {
info.block()
} else {
// counter.subtract(1, ordering: .relaxed)
timer.invalidate()
}
}
@objc func perform( timer:Timer) {
let block = timer.userInfo.flatMap {
unsafeBitCast($0 as AnyObject, to: (@convention(block) () -> Void).self)
}
if let block {
block()
// counter.subtract(1, ordering: .relaxed)
} else {
// isActive = false
// CFRunLoopStop(CFRunLoopGetCurrent())
}
}
}
struct RunningInfo {
var task:UnownedTaskExecutor?
var serial:UnownedSerialExecutor
}
@TaskLocal
private static var _current:Unmanaged<CurrentRunLoopExecutor>?
nonisolated(unsafe)
private let runLoop = RunLoop.current
let _isMainExecutor:Bool
nonisolated(unsafe)
private let store:RunLoopContextStore
nonisolated(unsafe)
private let source:CFRunLoopSource = {
let sym = CoreFoundationNamespace.symbols.create()
var source = CFRunLoopSourceContext()
source.retain = sym.retain
source.release = sym.release
source.copyDescription = sym.copyDescription
source.hash = sym.hash
source.equal = sym.equal
source.perform = {
let store = Unmanaged<RunLoopContextStore>.fromOpaque($0!).takeUnretainedValue()
var newHeap = CurrentRunLoopExecutor.makeBinaryHeap()
var info = RunningInfo.init(task: store.task, serial: store.serial)
store.lock.withLock {
swap(&store.heap, &newHeap)
}
let functionCall = unsafeBitCast(CFBinaryHeapGetMinimumIfPresent, to: ((CFBinaryHeap, AutoreleasingUnsafeMutablePointer<AnyObject?>) -> Bool).self)
weak var value:AnyObject?
let taskExecutor = store.task
let serialExecutor = store.serial!
while functionCall(newHeap, &value), let job = value.flatMap({ ($0 as! UnownedJob) }) {
value = nil
CFBinaryHeapRemoveMinimumValue(newHeap)
if let taskExecutor {
job.runSynchronously(isolatedTo: serialExecutor, taskExecutor: taskExecutor)
} else {
job.runSynchronously(on: serialExecutor)
}
}
}
let buffer = RunLoopContextStore()
source.info = Unmanaged.passUnretained(buffer).toOpaque()
return CFRunLoopSourceCreate(nil, 0, &source)
}()
class Invoker:NSObject {
private(set) var invoked = false
var storage:CurrentRunLoopExecutor?
@objc func invoke() {
invoked = true
storage = CurrentRunLoopExecutor._current?.takeUnretainedValue()
}
}
private static func peekLocal() -> CurrentRunLoopExecutor? {
let invoker = Invoker()
invoker.perform(#selector(invoker.invoke), on: Thread.current, with: nil, waitUntilDone: false)
while !invoker.invoked {
let future = CFDateGetAbsoluteTime(Date.distantFuture as CFDate)
CFRunLoopRunInMode(.defaultMode, future, true)
}
if let ref = invoker.storage {
return ref
}
return nil
}
internal static func currentExecutor() -> CurrentRunLoopExecutor {
if Thread.isMainThread {
// we are safe
return CurrentRunLoopExecutor()
} else if let t = _current?.takeUnretainedValue(), t.runLoop == .current {
// current RunLooperRunner === currentRunLoop
// this is also safe
return t
} else if RunLoop.current.currentMode != nil, let local = peekLocal() {
return local
} else {
/*
1. RunLoop is active but does not have `CurrentRunLoopExecutor` in nested loop
2. RunLoop is inactive -> which should always create new one
*/
return CurrentRunLoopExecutor()
}
}
private init() {
var context = CFRunLoopSourceContext()
CFRunLoopSourceGetContext(source, &context)
store = Unmanaged.fromOpaque(context.info).takeUnretainedValue()
_isMainExecutor = runLoop === RunLoop.main
store.serial = asUnownedSerialExecutor()
store.task = asUnownedTaskExecutor()
}
private func transferJob(to current:CurrentRunLoopExecutor) {
var newHeap = Self.makeBinaryHeap()
self.store.lock.withLock {
swap(&self.store.heap, &newHeap)
}
current.store.lock.withLock {
CFBinaryHeapApplyFunction(newHeap, {
CFBinaryHeapAddValue(Unmanaged<CFBinaryHeap>.fromOpaque($1!).takeUnretainedValue(), $0)
}, Unmanaged.passUnretained(current.store.heap).toOpaque())
}
CFRunLoopSourceSignal(current.source)
}
public func run() throws {
guard runLoop === RunLoop.current else {
throw CancellationError()
}
let shouldRemoveSource:Bool
if !CFRunLoopContainsSource(runLoop.getCFRunLoop(), source, .defaultMode) {
if runLoop.currentMode != nil, let t = Self.peekLocal(), t !== self {
// already installed runloop detected
transferJob(to: t)
throw CancellationError()
}
CFRunLoopAddSource(runLoop.getCFRunLoop(), source, .defaultMode)
shouldRemoveSource = true
} else {
shouldRemoveSource = false
}
defer {
if shouldRemoveSource {
CFRunLoopRemoveSource(runLoop.getCFRunLoop(), source, .defaultMode)
}
}
do {
store.enterCount += 1
defer { store.enterCount -= 1}
CurrentRunLoopExecutor.$_current.withValue(.passUnretained(self)) {
let future = CFDateGetAbsoluteTime(Date.distantFuture as CFDate)
while CFRunLoopRunInMode(.defaultMode, future, false) == .timedOut {
}
}
}
}
public func runUntil(_ condition: () -> Bool) throws {
guard runLoop === RunLoop.current else {
throw CancellationError()
}
let shouldRemoveSource:Bool
if !CFRunLoopContainsSource(runLoop.getCFRunLoop(), source, .defaultMode) {
if runLoop.currentMode != nil, let t = Self.peekLocal(), t !== self {
// already installed runloop detected
transferJob(to: t)
throw CancellationError()
}
CFRunLoopAddSource(runLoop.getCFRunLoop(), source, .defaultMode)
shouldRemoveSource = true
} else {
shouldRemoveSource = false
}
defer {
if shouldRemoveSource {
CFRunLoopRemoveSource(runLoop.getCFRunLoop(), source, .defaultMode)
}
}
do {
store.enterCount += 1
defer {
store.enterCount -= 1
}
while !condition() {
CurrentRunLoopExecutor.$_current.withValue(.passUnretained(self)) {
RunLoop.current.run(mode: .default, before: .distantFuture)
}
}
}
}
public func stop() {
CFRunLoopStop(runLoop.getCFRunLoop())
}
func checkIsolated() {
precondition(runLoop === RunLoop.current)
}
func isIsolatingCurrentContext() -> Bool? {
return runLoop === RunLoop.current
}
func asUnownedSerialExecutor() -> UnownedSerialExecutor {
UnownedSerialExecutor(complexEquality: self)
}
func isSameExclusiveExecutionContext(other: CurrentRunLoopExecutor) -> Bool {
runLoop === other.runLoop
}
func enqueue(_ job: consuming ExecutorJob) {
if _isMainExecutor {
MainActor.shared.enqueue(UnownedJob(job))
} else {
let job = UnownedJob(job)
let ref = job as AnyObject
self.store.lock.withLock {
let t = Unmanaged.passUnretained(ref).toOpaque()
CFBinaryHeapAddValue(self.store.heap, t)
}
CFRunLoopSourceSignal(source)
CFRunLoopWakeUp(runLoop.getCFRunLoop())
// let ref = asUnownedSerialExecutor()
// runLoop.perform {
// job.runSynchronously(on: ref)
// }
}
}
public var isMainExecutor: Bool { false }
// var asScheduling: (any SchedulingExecutor)? {
// return self
// }
//
@available(*,unavailable)
public func enqueue<C: Clock>(_ job: consuming ExecutorJob,
at instant: C.Instant,
tolerance: C.Duration? = nil,
clock: C)
{
if let continuous = clock as? ContinuousClock {
let i = (instant as! ContinuousClock.Instant)
continuous.now.duration(to: i)
} else if let suspending = clock as? SuspendingClock {
(instant as! SuspendingClock.Instant)
} else {
clock
}
// _dispatchEnqueue(job, at: instant, tolerance: tolerance, clock: clock,
// executor: self, global: false)
}
}
import Combine
extension CurrentRunLoopExecutor: Combine.Scheduler {
typealias SchedulerOptions = RunLoop.SchedulerOptions
typealias SchedulerTimeType = RunLoop.SchedulerTimeType
func schedule(options: SchedulerOptions?, _ action: @escaping () -> Void) {
if _isMainExecutor {
return runLoop.schedule(options: options, action)
}
self.runLoop.schedule(options: options) {
action()
}
}
func schedule(
after date: SchedulerTimeType,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void
) {
if _isMainExecutor {
return runLoop.schedule(after: date, tolerance: tolerance, options: options, action)
}
let block:@convention(block) () -> Void = action
let timer = Timer(fireAt: date.date, interval: 0, target: self.store, selector: #selector(RunLoopContextStore.perform(timer:)), userInfo: block, repeats: false)
runLoop.add(timer, forMode: .common)
}
func schedule(
after date: SchedulerTimeType,
interval: SchedulerTimeType.Stride,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void
) -> any Cancellable {
if _isMainExecutor {
return runLoop.schedule(after: date, interval: interval, tolerance: tolerance, options: options, action)
}
let info = RepeatInfo(block: action)
let timer = Timer(fireAt: date.date, interval: interval.timeInterval, target: self.store, selector: #selector(RunLoopContextStore.repeatedBlock(_:)), userInfo: info, repeats: true)
info.timer = timer
self.runLoop.add(timer, forMode: .common)
return AnyCancellable(info)
}
var minimumTolerance: SchedulerTimeType.Stride {
runLoop.minimumTolerance
}
var now: SchedulerTimeType {
runLoop.now
}
class RepeatInfo:Combine.Cancellable {
let boolean = Atomic(true)
weak var timer:Timer?
let block:() -> Void
init(block: @escaping () -> Void) {
self.block = block
}
func cancel() {
if boolean.compareExchange(expected: true, desired: false, ordering: .acquiringAndReleasing).exchanged {
timer?.fireDate = .init()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment