Skip to content

Instantly share code, notes, and snippets.

@RWalkling
Created February 6, 2020 19:48
Show Gist options
  • Select an option

  • Save RWalkling/3635e57f9a0c0d4a13bbd9d348e92660 to your computer and use it in GitHub Desktop.

Select an option

Save RWalkling/3635e57f9a0c0d4a13bbd9d348e92660 to your computer and use it in GitHub Desktop.
import { Observable, Subject } from 'rxjs'
import { distinctUntilChanged, map, withLatestFrom } from 'rxjs/operators'
import { apply, assocPath, path } from 'ramda'
type StreamLens<TObject, TOriginal> = <TKey extends keyof TObject | undefined = undefined>(
key?: TKey,
) => TKey extends keyof TObject
? StreamLens<TObject[TKey], TOriginal>
: {
readonly stream: Observable<TObject>
readonly out: Subject<TObject>['next']
}
function streamLens<TObject extends object>(observable: Observable<TObject>): StreamLens<TObject, TObject> {
const collectKeys = (keys = []) => (key?) =>
key === undefined
? (() => {
const subject = new Subject()
subject.pipe(withLatestFrom(observable), map(apply(assocPath(keys)))).subscribe(observable)
return {
stream: observable.pipe(map(path(keys)), distinctUntilChanged()),
out: subject.next.bind(subject),
}
})()
: collectKeys(keys.concat(key))
return collectKeys()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment