rxswift

栏目: Swift · 发布时间: 5年前

内容简介:我们在编程中经常处理各种事件,各种事件也和我们的业务逻辑紧密相关,比如点击了这个btn,我该执行什么逻辑等等、我该用代理、通知、还是target-action来传递我们的事件、我该在main queue还是global queue上执行任务等等。在rxswift看来,无论是什么场景,这里面有一个不变的东西,那就是消息的生产、消息的发送、消息的处理、资源的销毁。我们以前实际上是用业务逻辑去包裹了这一套东西。rxswift认为这样实际上是做反了,我们应该以消息的流动去包裹业务逻辑,而不应该用业务逻辑去包裹消息

我们在编程中经常处理各种事件,各种事件也和我们的业务逻辑紧密相关,比如点击了这个btn,我该执行什么逻辑等等、我该用代理、通知、还是target-action来传递我们的事件、我该在main queue还是global queue上执行任务等等。在rxswift看来,无论是什么场景,这里面有一个不变的东西,那就是消息的生产、消息的发送、消息的处理、资源的销毁。

我们以前实际上是用业务逻辑去包裹了这一套东西。rxswift认为这样实际上是做反了,我们应该以消息的流动去包裹业务逻辑,而不应该用业务逻辑去包裹消息的流动。因为消息的流动是不变的,业务逻辑是变的,根据封装 变化的原则,我们用一个抽象的叫做Observable的东西来封装不同的业务逻辑。然后用Observerbal、Observer、disposable、scheduler等实现一套通用的消息处理机制。这样就能从各种代理、通知、target-action不同的消息处理方式中解放出来,使用同一的Observerable、Observer来实现。而且rx是一整套的,所以如果你切换到java,c++等其他语言,你也可以不用对每种语言都了解线程、传值、通知等,就能直接使用顶层的抽象。ok,按照这样的理解,我们来看看rxswift中的几个比较重要的类。

Observable

消息的流动是从消息的生成开始。而Observable就是代表这个抽象。不过呢,rxswift不止把消息生成的任务交给了Observable,还将消息发送的功能交给了Observable,也就是subscribe函数。

public protocol ObservableType:ObservableConvertibleType{
 //自身将消息交给Observer,让他去处理,处理后返回一个disposable用于取消或者释放资源
 func subscribe<O: ObserverType>(_observer: O) -> Disposable where O.E == E
}

Observer

消息流动过程中消息的处理者。Observer就代表这个抽象。

//Observable发送Event,Observer通过on函数接收,然后执行自己的处理逻辑。
public protocol ObserverType{
    associatedtype E
    func on(_event: Event<E>)
}

Disposable

如果调用了disposable的dispose方法,则代表了,任务已经完成或者取消执行,然后进行资源的清理。disposable有时会很复杂,因为他要清理资源,而我们的Observable可能是由多个Observable按照某种约定生成的,所以最终的disposable需要负责dispose所有的disposable。当然如果情况简单,可以创建一个简单的disposable。

public protocol Disposable{
    /// Dispose resource.
    func dispose()
}

scheduler

提供了统一的方式让我们在不同线程执行任务。MainQueue, GloableQueue…

Producer

producer是实现了Observable协议的一个基类。他重写了subscribe方法,处理了isScheduleRequired的问题,然后将具体的消息发送交给了run方法。

class Producer<Element> :Observable<Element>{
    override init() {
        super.init()
    }
    
    override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                print(Thread.current)
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
    
    func run<O : ObserverType>(_observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

读过一遍之后,我们可能会有这几个疑问:

  • 为什么要去判断isScheduleRequired?
  • 为什么需要创建SinkDisposer, 用它来包裹两个其他disposable?
  • 这两个disposable究竟是什么?

为什么要去判断isScheduleRequired?

要解释这个问题,我们的先了解什么是CurrentThreadScheduler ,他是拿来干啥的?

CurrentThreadScheduler抽象了在当前线程执行某个任务的这样一个功能。非常重要的一点是,他是一个 Serail scheduler 。也就是说我是串行执行的,因为我们可能在其他不同线程上dispatch了一些任务到这个线程。那要怎么保证串行执行呢?恩,我们定义一个pthread_key 。如果当前线程有任务再执行,那么我们在当前的线程中设置一个specific存储这个key和value。任务执行完成后,我们清除这个标记。当有其他任务一起进来时,会先判断有没有这个key,有的话表示线程正在执行任务,其他任务会被放到一个Queue中缓存起来,在当前任务执行完成之后执行。所以 isScheduleRequired 其实是一个防止并发的标记。我们来看看代码:

public static fileprivate(set) var isScheduleRequired: Bool {
    get {
        //从线程中获取key的值,如果是首次进入,那么会为nil,isScheduleRequired也就是true
        return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
    }
    set(isScheduleRequired) {
        if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
            rxFatalError("pthread_setspecific failed")
        }
    }
}
  public func schedule<StateType>(_state: StateType, action: @escaping(StateType) -> Disposable) -> Disposable {
      //如果当前没有任务在执行,isScheduleRequired = true
      if CurrentThreadScheduler.isScheduleRequired {
          //将其设置为false,就是设置 pthread_setspecific
          CurrentThreadScheduler.isScheduleRequired = false
          //执行任务,拿到disposable
          let disposable = action(state)

          defer {
              //任务执行完毕,将isScheduleRequired设置为true
              CurrentThreadScheduler.isScheduleRequired = true
              CurrentThreadScheduler.queue = nil
          }
          //如果当前任务执行完了,queue里也没有什么东西,退出
          guard let queue = CurrentThreadScheduler.queue else {
              return disposable
          }
	//queue里还有缓存的schduledItem, 遍历执行
          while let latest = queue.value.dequeue() {
              if latest.isDisposed {
                  continue
              }
              latest.invoke()
          }

          return disposable
      }
	//isScheduleRequired为false时执行这部分代码
      let existingQueue = CurrentThreadScheduler.queue

      let queue: RxMutableBox<Queue<ScheduledItemType>>
      if let existingQueue = existingQueue {
          queue = existingQueue
      }
      else {
          queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
          CurrentThreadScheduler.queue = queue
      }
//构建一个ScheduledItem入队
      let scheduledItem = ScheduledItem(action: action, state: state)
      queue.value.enqueue(scheduledItem)

      return scheduledItem
  }

好了,现在我们知道Producer处理了通用情况,在当前线程执行任务,并且通过isScheduleRequired处理了并发的情况。

为什么需要创建SinkDisposer, 用它来包裹两个其他disposable?

我的理解是用于取消。在subscribe过程中,我们可能创建许多的disposable,但是假如我们要取消,怎么办呢?没错,所有disposable都调用dispose就行啦。好的,那总有一个东西记录所有的disposable吧,没错,于是我们定义了SinkDisposable,他包裹了两个disposable。在取消时,调用sinkdisposable的dispose就行了,sinkdisposable又会去调用sink和subscribe返回的disposable的dispose。这样所有都释放完了,我们不用知道这个过程中生成了那些disposable。

///初始化一个SinkDisposer
let disposer = SinkDisposer()
//调用run,得到一个sink和subscription
let sinkAndSubscription = self.run(observer, cancel: disposer)
//将sink和subscription设置到disposer里
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
///sinkdisposer的dispose
func dispose() {
    let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)

    if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
        return
    }

    if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
        guard let sink = _sink else {
            rxFatalError("Sink not set")
        }
        guard let subscription = _subscription else {
            rxFatalError("Subscription not set")
        }
		//前面是一坨判断,用于处理已经dispose的情况,dispose可能被调多次,原因在于Sink的dispose。
        sink.dispose()
        subscription.dispose()

        _sink = nil
        _subscription = nil
    }
}

这两个disposable究竟是什么?

subscription这个disposable,我们很好理解。subscribe方法就会返回一个disposable。这里的就是那个disposable。

sink是一个observer+cancelable组合成的disposable。我们从上面可以看到,subscribe方法将具体的消息发送交给了run方法。所以我们需要在run方法里实现类似于Observer.on()这样的代码。而Sink就是对这部分代码的封装。他们都有一些共同的特性,比如都需要一个Observer来执行on,需要一个cancelabl来取消,都需要一个方法来执行Observer.on方法。将这些通用的东西提取出来就是Sink类。

class Sink<O:ObserverType> :Disposable{
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate var _disposed: Bool

    init(observer: O, cancel: Cancelable) {
        _observer = observer
        _cancel = cancel
        _disposed = false
    }
    
    final func forwardOn(_event: Event<O.E>) {
        if _disposed {
            return
        }
        _observer.on(event)
    }
    //SinkForward也封装了on方法,和forwardOn的区别在于,会调用cancel.dispose。
    final func forwarder() -> SinkForward<O> {
        return SinkForward(forward: self)
    }

    final var disposed: Bool {
        return _disposed
    }

    func dispose() {
        _disposed = true
        _cancel.dispose()
    }
}
///只是简单的对on方法的封装
final class SinkForward<O:ObserverType>:ObserverType{
    typealias E = O.E
    
    private let _forward: Sink<O>
    
    init(forward: Sink<O>) {
        _forward = forward
    }
    
    final func on(_event: Event<E>) {
        switch event {
        case .next:
            _forward._observer.on(event)
        case .error, .completed:
            _forward._observer.on(event)
            _forward._cancel.dispose()
        }
    }
}

实际例子

好了,现在来看看一个完整的例子,我们选简单的Just来分析下。Just只是简单的发送一个element而已。

public static func just(_element: E) -> Observable<E> {
    return Just(element: element)
}
//继承自producer
final fileprivate class Just<Element> :Producer<Element>{
private let _element: Element

init(element: Element) {
    _element = element
}
///做了优化,不是重写run方法,而是直接重写subscribe,observer.on,然后返回NopDisposable。
override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element {
    observer.on(.next(_element))
    observer.on(.completed)
    return Disposables.create()
}
}
public static func just(_element: E, scheduler: ImmediateSchedulerType) -> Observable<E> {
    return JustScheduled(element: element, scheduler: scheduler)
}
//也是继承producer
final fileprivate class JustScheduled<Element> :Producer<Element>{
    fileprivate let _scheduler: ImmediateSchedulerType
    fileprivate let _element: Element

    init(element: Element, scheduler: ImmediateSchedulerType) {
        _scheduler = scheduler
        _element = element
    }
  //重写run方法,将observer.on这部分代码,交给Sink去实现
    override func run<O : ObserverType>(_observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        //sink持有cancelable,也就是Producer中的SinkDispose。当on接收了所有sequence之后调用了Sink的dispose,Sink的dispose会调用cancelable的dispose。cancelable实际上有持有sink和subscruption。此时sink是已经dispose的了,不会再次执行,subscruption的dispose正常执行。
        let sink = JustScheduledSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
final fileprivate class JustScheduledSink<O:ObserverType> :Sink<O>{
    typealias Parent = JustScheduled<O.E>

    private let _parent: Parent

    init(parent: Parent, observer: O, cancel: Cancelable) {
        _parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    //调用Observer.on
    func run() -> Disposable {
        let scheduler = _parent._scheduler
        return scheduler.schedule(_parent._element) { element in
            self.forwardOn(.next(element))
            return scheduler.schedule(()) { _ in
                self.forwardOn(.completed)
				//既然sinkDisposer都已经释放了,我们自己创建一个什么也不做的disposable返回
                self.dispose()
                return Disposables.create()
            }
        }
    }
}

好了,其实很多rxswift operater都是按照这样写法完成的,我们其实可以先不看他的实现,自己先按照这一套东西写一遍,再比对,可以加深我们对泛型的理解还有具体的实现。

Subject

在消息传递的线路中,我们可能会需要这样的一种中间节点。他从别的地方接收消息(也可能自己直接onEvent),然后将这个消息分发给其他多个Observer。相当于一个中间的消息转发装置。Subject就是对这样的节点的一个抽象。

由于他能接收消息,所以应该实现ObserverType协议。他又能将消息发送给其他多个Observer,所以他还必须是一个Observable 。同时他还能断开这个节点,所以他还需要是一个disposable,在dispos时移除所有observer。

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {
    public typealias SubjectObserverType = PublishSubject<Element>

    typealias Observers = AnyObserver<Element>.s
    typealias DisposeKey = Observers.KeyType
    
    /// Indicates whether the subject has any observers
    public var hasObservers: Bool {
        _lock.lock()
        let count = _observers.count > 0
        _lock.unlock()
        return count
    }
    
    private let _lock = RecursiveLock()
    
    // state
    private var _isDisposed = false
    private var _observers = Observers()//一个Bag,保存了所有Observer的on方法
    private var _stopped = false
    private var _stoppedEvent = nil as Event<Element>?

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    /// Indicates whether the subject has been isDisposed.
    public var isDisposed: Bool {
        return _isDisposed
    }
    
    /// Creates a subject.
    public override init() {
        super.init()
        #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
        #endif
    }
    
    /// 可以通过on方法接收事件,接收后,如果连接的有任何observer会将event发送给他们
    public func on(_event: Event<Element>) {
        dispatch(_synchronized_on(event), event)
    }

    func _synchronized_on(_event: Event<E>) -> Observers {
        _lock.lock(); defer { _lock.unlock() }
        switch event {
        case .next(_):
            if _isDisposed || _stopped {
                return Observers()
            }
            
            return _observers
        case .completed, .error:
            if _stoppedEvent == nil {
                _stoppedEvent = event
                _stopped = true
                let observers = _observers
                _observers.removeAll()
                return observers
            }

            return Observers()
        }
    }
    
    //当subjectsubscribe某个observer,实际上是将其添加到了自己的Observers中。
    public override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element {
        _lock.lock()
        let subscription = _synchronized_subscribe(observer)
        _lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == E {
        //如果已经stop了,返回一个stoppedEvent
        if let stoppedEvent = _stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        //如果已经dispose了,返回一个Error
        if _isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        //将observer的on方法插入observers,并得到一个key,返回一个SubscriptionDisposable,在他dispose时会通过key移除对应的on。
        let key = _observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }
    //SubscriptionDisposable dispose时会调用他
    func synchronizedUnsubscribe(_disposeKey: DisposeKey) {
        _lock.lock()
        _synchronized_unsubscribe(disposeKey)
        _lock.unlock()
    }

    func _synchronized_unsubscribe(_disposeKey: DisposeKey) {
        //通过key移除对应的on。
        _ = _observers.removeKey(disposeKey)
    }
    
    /// Returns observer interface for subject.
    public func asObserver() -> PublishSubject<Element> {
        return self
    }
    
    /// Unsubscribe all observers and release resources.
    public func dispose() {
        _lock.lock()
        _synchronized_dispose()
        _lock.unlock()
    }

    final func _synchronized_dispose() {
        _isDisposed = true
        _observers.removeAll()
        _stoppedEvent = nil
    }

    #if TRACE_RESOURCES
        deinit {
            _ = Resources.decrementTotal()
        }
    #endif
}

还有其他几种subject,他们都是处理某些特殊情况,比如:如果我的observer是在事件流动的过程中加入的,那么我想加入后能够立即收到SUbject最后收到的那个消息,这就是 BehaviorSubject 完成的工作。还有ReplaySubject和AsyncSubject,都比较简单。

谢谢你请我吃糖果


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Lighttpd

Lighttpd

Andre Bogus / Packt Publishing / 2008-10 / 39.99

This is your fast guide to getting started and getting inside the Lighttpd web server. Written from a developer's perspective, this book helps you understand Lighttpd, and get it set up as securely an......一起来看看 《Lighttpd》 这本书的介绍吧!

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具