问题1 在看代码的时候,发现代码中使用到了
1 2 3 4 5 6 7 8 9 10 11 12 13 - (void)viewDidload {     [super viewDidload];     [self bindData]; }    - (void)bindData {     [[RACObserve(self, propertyA) ignore:nil]                                   subscribeNext:^(NSArray *dataA) {         NSLog(@"use dataA");     }]; } 
但是在这个类的propertyA是在init之后去设置的,在viewDidload之前。也就是在使用RAC订阅属性变化信号之前,但是use dataA打印出来了。猜测RACObserve宏生成信号在调用subscribeNext中,直接就调用了dataA的block的逻辑。但是感觉比较奇怪,不应该是propertyA变化的时候才会调用dataA的block的逻辑吗。
现在具体看一下,一个信号的创建和订阅的源码:
信号创建: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber)) didSubscribe {       return [RACDynamicSignal createSignal:didSubscribe];   } + (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber)) didSubscribe {       RACDynamicSignal *signal = [[self alloc] init];       signal->_didSubscribe = [didSubscribe copy];       return [signal setNameWithFormat:@"+createSignal:"]; } 
在创建一个信号的时候,会传进来一个叫didSubscribe的block,该信号会把它存下来。
信号订阅 RACSignal的subscribeNext方法:
1 2 3 4 5 - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {     NSCParameterAssert(nextBlock != NULL);     RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];     return [self subscribe:o]; } 
在singal的subscribeNext中,生成了一个subscriber。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 + (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError  *error))error completed:(void (^)(void))completed {       RACSubscriber *subscriber = [[self alloc] init];         subscriber->_next = [next copy];       subscriber->_error = [error copy];       subscriber->_completed = [completed copy];       return subscriber; } 
subscriber保存了nextBlock,errorBlock,completedBlock等数据信息
接着看signal的subscribe方法,改方法的参数是subscribeNext方法中生成的subscriber对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {       NSCParameterAssert(subscriber != nil);       RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];       subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];         if (self.didSubscribe != NULL) {           RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{               RACDisposable *innerDisposable = self.didSubscribe(subscriber);               [disposable addDisposable:innerDisposable];           }];             [disposable addDisposable:schedulingDisposable];       }       return disposable;   } 
1 2 3 4 5 6 7 8 9 10 - (RACDisposable *)schedule:(void (^)(void))block {     NSCParameterAssert(block != NULL);       if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];       block();       return nil;   } 
在signal的subscribe方法中,调用了RACScheduler.subscriptionScheduler schedule 方法,直接就将传入的block调用了,最终调用了signal的didSubscribe block,将subscriber传入。
再看一下RACObserve在生成一个signal的时候,传入的didSubscribe block逻辑的怎样的,以下是RACObserve相关源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #define RACObserve(TARGET, KEYPATH) \       ({ \           _Pragma("clang diagnostic push") \           _Pragma("clang diagnostic ignored \"-Wreceiver-is-weak\"") \           __weak id target_ = (TARGET); \           [target_ rac_valuesForKeyPath:@keypath(TARGET, KEYPATH) observer:self];          \           _Pragma("clang diagnostic pop") \       }) 
在NSObject的RACPropertySubscribing分类中定义rac_valuesForKeyPath:observer:self:方法
继续:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 - (RACSignal *)rac_valuesForKeyPath:(NSString *)keyPath observer:(__weak      NSObject *)observer {       return [[[self           rac_valuesAndChangesForKeyPath:keyPath options:NSKeyValueObservingOptionInitial observer:observer]           map:^(RACTuple *value) {               // -map: because it doesn't require the block trampoline that -reduceEach: uses               return value[0];           }]           setNameWithFormat:@"RACObserve(%@, %@)", self.rac_description, keyPath];   } 
继续:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 - (RACSignal *)rac_valuesAndChangesForKeyPath:(NSString *)keyPath options:(     NSKeyValueObservingOptions)options observer:(__weak NSObject *)     weakObserver {       NSObject *strongObserver = weakObserver;       keyPath = [keyPath copy];       NSRecursiveLock *objectLock = [[NSRecursiveLock alloc] init];       objectLock.name = @"     org.reactivecocoa.ReactiveCocoa.NSObjectRACPropertySubscribing";       __weak NSObject *weakSelf = self;       RACSignal *deallocSignal = [[RACSignal           zip:@[               self.rac_willDeallocSignal,               strongObserver.rac_willDeallocSignal ?: [RACSignal never]           ]]           doCompleted:^{               // Forces deallocation to wait if the object variables are currently               // being read on another thread.               [objectLock lock];               @onExit {                   [objectLock unlock];               };           }];   //重点关注这里,createSignal之后的参数就是该信号的didSubscribe block逻辑了。     return [[[RACSignal           createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {               // Hold onto the lock the whole time we're setting up the KVO               // observation, because any resurrection that might be caused by our               // retaining below must be balanced out by the time -dealloc returns               // (if another thread is waiting on the lock above).               [objectLock lock];               @onExit {                   [objectLock unlock];               };               __strong NSObject *observer __attribute__((objc_precise_lifetime))              = weakObserver;               __strong NSObject *self __attribute__((objc_precise_lifetime)) =              weakSelf;               if (self == nil) {                   [subscriber sendCompleted];                   return nil;               }                 return [self rac_observeKeyPath:keyPath options:options observer:             observer block:^(id value, NSDictionary *change, BOOL                  causedByDealloc, BOOL affectedOnlyLastComponent) {                   [subscriber sendNext:RACTuplePack(value, change)];               }];           }]           takeUntil:deallocSignal]           setNameWithFormat:@"%@ -rac_valueAndChangesForKeyPath: %@ options: %lu          observer: %@", self.rac_description, keyPath, (unsigned long)options,          strongObserver.rac_description];   } 
可以看到在RACObserver宏定义的signal的didSubscriber block中又调用了rac_observeKeyPath:keyPath options: observer: block
继续(太长了只贴重点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 - (RACDisposable *)rac_observeKeyPath:(NSString *)keyPath options:(     NSKeyValueObservingOptions)options observer:(__weak NSObject *)     weakObserver block:(void (^)(id, NSDictionary *, BOOL, BOOL))block {       NSCParameterAssert(block != nil);       NSCParameterAssert(keyPath.rac_keyPathComponents.count > 0);       //省略数十行     // Call the block with the initial value if needed.       if ((options & NSKeyValueObservingOptionInitial) != 0) {           id initialValue = [self valueForKeyPath:keyPath];           NSDictionary *initialChange = @{               NSKeyValueChangeKindKey: @(NSKeyValueChangeSetting),               NSKeyValueChangeNewKey: initialValue ?: NSNull.null,           };           block(initialValue, initialChange, NO, keyPathHasOneComponent);       }           //省略数十行   } 
说明一下,options是NSKeyValueObservingOptions属于NS_OPTIONS
1 2 3 4 5 6 7 8 9 10 typedef NS_OPTIONS(NSUInteger, NSKeyValueObservingOptions) {       NSKeyValueObservingOptionNew = 0x01,       NSKeyValueObservingOptionOld = 0x02,       NSKeyValueObservingOptionInitial NS_ENUM_AVAILABLE(10_5, 2_0) = 0x04,       NSKeyValueObservingOptionPrior NS_ENUM_AVAILABLE(10_5, 2_0) = 0x08 }; 
在以上方法中,它判断了,传入的options是否是NSKeyValueObservingOptionInitial类型,而在调用rac_observeKeyPath: options: observer: block:的时候,option就是传的NSKeyValueObservingOptionInitial,所以会直接调用传进来的block,在rac_valuesAndChangesForKeyPath: options: observer:中调用rac_observeKeyPath: options: observer: block:的时候传入block里面的逻辑是这样:
1 [subscriber sendNext:RACTuplePack(value, change)]; 
综上所述,RACObserver生成的signal在调用subscribeNext方法订阅该信号的时候,会直接调用一次订阅信号之后next block的逻辑,所以即便是属性变化之后订阅属性变化信号,它也会默认先调用一次next block的逻辑。
正常kvo检测转化成信号的逻辑:
在RACObserver初始化的过程中,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 - (RACDisposable *)rac_observeKeyPath:(NSString *)keyPath options:(     NSKeyValueObservingOptions)options observer:(__weak NSObject *)     weakObserver block:(void (^)(id, NSDictionary *, BOOL, BOOL))block {       NSCParameterAssert(block != nil);       NSCParameterAssert(keyPath.rac_keyPathComponents.count > 0);       //省略数十行       NSKeyValueObservingOptions trampolineOptions = (options |          NSKeyValueObservingOptionPrior) & ~NSKeyValueObservingOptionInitial;       RACKVOTrampoline *trampoline = [[RACKVOTrampoline alloc] initWithTarget:     self observer:strongObserver keyPath:keyPathHead options:trampolineOptions      block:^(id trampolineTarget, id trampolineObserver, NSDictionary *change) {           // If this is a prior notification, clean up all the callbacks added to the           // previous value and call the callback block. Everything else is deferred           // until after we get the notification after the change.           if ([change[NSKeyValueChangeNotificationIsPriorKey] boolValue]) {               [firstComponentDisposable() dispose];               if ((options & NSKeyValueObservingOptionPrior) != 0) {                   block([trampolineTarget valueForKeyPath:keyPath], change, NO,                      keyPathHasOneComponent);               }             return;           }           // From here the notification is not prior.           NSObject *value = [trampolineTarget valueForKey:keyPathHead];             // If the value has changed but is nil, there is no need to add callbacks to           // it, just call the callback block.           if (value == nil) {             block(nil, change, NO, keyPathHasOneComponent);               return;         }           // From here the notification is not prior and the value is not nil.             // Create a new firstComponentDisposable while getting rid of the old one at           // the same time, in case this is being called concurrently.           RACDisposable *oldFirstComponentDisposable = [         firstComponentSerialDisposable swapInDisposable:[RACCompoundDisposable          compoundDisposable]];           [oldFirstComponentDisposable dispose];           addDeallocObserverToPropertyValue(value);             // If there are no further key path components, there is no need to add the           // other callbacks, just call the callback block with the value itself.           if (keyPathHasOneComponent) {               block(value, change, NO, keyPathHasOneComponent);               return;           }           // The value has changed, is not nil, and there are more key path components           // to consider. Add the callbacks to the value for the remaining key path           // components and call the callback block with the current value of the full           // key path.           addObserverToValue(value);           block([value valueForKeyPath:keyPathTail], change, NO, keyPathHasOneComponent);       }];       // Stop the KVO observation when this one is disposed of.       [disposable addDisposable:trampoline];        //省略数十行   } 
在该方法中生成了一个RACKVOTrampoline中间对象,看它的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 - (id)initWithTarget:(__weak NSObject *)target observer:(__weak NSObject *) observer keyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions) options block:(RACKVOBlock)block {       NSCParameterAssert(keyPath != nil);       NSCParameterAssert(block != nil);       NSObject *strongTarget = target;       if (strongTarget == nil) return nil;       self = [super init];       if (self == nil) return nil;     _keyPath = [keyPath copy];       _block = [block copy];       _weakTarget = target;       _unsafeTarget = strongTarget;       _observer = observer;       [RACKVOProxy.sharedProxy addObserver:self forContext:(__bridge void *)self];       [strongTarget addObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath      options:options context:(__bridge void *)self];       [strongTarget.rac_deallocDisposable addDisposable:self];       [self.observer.rac_deallocDisposable addDisposable:self];     return self; }   - (void)dealloc {       [self dispose];   }   #pragma mark Observation   - (void)dispose {       NSObject *target;       NSObject *observer;       @synchronized (self) {           _block = nil;             // The target should still exist at this point, because we still need to           // tear down its KVO observation. Therefore, we can use the unsafe           // reference (and need to, because the weak one will have been zeroed by           // now).           target = self.unsafeTarget;           observer = self.observer;                 _unsafeTarget = nil;           _observer = nil;       }       [target.rac_deallocDisposable removeDisposable:self];       [observer.rac_deallocDisposable removeDisposable:self];       [target removeObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath      context:(__bridge void *)self];       [RACKVOProxy.sharedProxy removeObserver:self forContext:(__bridge void *)self]; }     //系统的代理方法,其实是由RACKVOProxy.sharedProxy对象转发的,RACKVOProxy.sharedProxy才是真正处理系统消息的对象。 - (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(     NSDictionary *)change context:(void *)context {       if (context != (__bridge void *)self) {           [super observeValueForKeyPath:keyPath ofObject:object change:change          context:context];           return;       }       RACKVOBlock block;       id observer;       id target;       @synchronized (self) {           block = self.block;           observer = self.observer;           target = self.weakTarget;       }       if (block == nil || target == nil) return;       block(target, observer, change);   } 
可以看到RACKVOTrampoline对象替代原来使用KVO的对象,作为系统的代理,实现了代理方法。实际上,真正调用系统KVO注册的方法的时候,是往一个叫RACKVOProxy.sharedProxy的全局单例对象注册的。RACKVOTrampoline是具体处理KVO消息的对象,在RACKVOPorxy.shareProxy对象中注册了所有使用RAC KVO的系统消息,再由它转发给具体的RACKVOTrampoline进行处理,而在RACKVOTrampoline处理的时候,调用了RACKVOtrampoline初始化的时候传进来的block。之后在RACKVOTrampoline参数block调用过程中就会调用sendNext方法了,往外面发信号数据。
以下是RACKVOProxy.sharedProxy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 @interface RACKVOProxy()   @property (strong, nonatomic, readonly) NSMapTable *trampolines;   @property (strong, nonatomic, readonly) dispatch_queue_t queue;     @end   @implementation RACKVOProxy     + (instancetype)sharedProxy {       static RACKVOProxy *proxy;       static dispatch_once_t onceToken;             dispatch_once(&onceToken, ^{           proxy = [[self alloc] init];       });             return proxy;   }   - (instancetype)init {       self = [super init];       if (self == nil) return nil;             _queue = dispatch_queue_create("org.reactivecocoa.ReactiveCocoa.RACKVOProxy     ", DISPATCH_QUEUE_SERIAL);       _trampolines = [NSMapTable strongToWeakObjectsMapTable];             return self;   }   - (void)addObserver:(__weak NSObject *)observer forContext:(void *)context {       NSValue *valueContext = [NSValue valueWithPointer:context];             dispatch_sync(self.queue, ^{           [self.trampolines setObject:observer forKey:valueContext];       });   }   - (void)removeObserver:(NSObject *)observer forContext:(void *)context {       NSValue *valueContext = [NSValue valueWithPointer:context];             dispatch_sync(self.queue, ^{           [self.trampolines removeObjectForKey:valueContext];       });   }   - (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(     NSDictionary *)change context:(void *)context {       NSValue *valueContext = [NSValue valueWithPointer:context];       __block NSObject *trueObserver;             dispatch_sync(self.queue, ^{           trueObserver = [self.trampolines objectForKey:valueContext];       });           if (trueObserver != nil) {           [trueObserver observeValueForKeyPath:keyPath ofObject:object change:         change context:context];       }   } 
RACKVOProxy.sharedProxy管理了整个RAC 中KVO的处理系统KVO消息的中间对象和系统KVO消息的转发。
综合上面的代码可以看出,正是由于各种中间对象替用户实现了代理方法起了代理对象的作用,用户才能把代码写的更加紧凑清晰。
问题2 看以下代码,假设combineLatest之后得到的信号是A
1 2 3 4 5 [[RACSignal combineLatest:@[[RACObserve(self, propertyA) ignore:nil], [ RACObserve(self, propertyB) ignore:nil]]] subscribeNext:^(RACTuple *tuple) {     }]; 
1.使用combineLatest的时候,第一次订阅会不会触发subscribeNext后面的block
2.combineLatest中的信号,是同时调用了sendNext之后会触发A调用sendNext,还是只需要其中有一个信号调用了sendNext会触发A调用sendNext
看一下combineLatest源码:
1 2 3 4 5 6 7 8 + (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals {       return [[self join:signals block:^(RACSignal *left, RACSignal *right) {           return [left combineLatestWith:right];       }] setNameWithFormat:@"+combineLatest: %@", signals]; } 
继续 join: block:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 + (instancetype)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id,      id))block {     //第一段     RACStream *current = nil;     // Creates streams of successively larger tuples by combining the input       // streams one-by-one.       for (RACStream *stream in streams) {           // For the first stream, just wrap its values in a RACTuple. That way,           // if only one stream is given, the result is still a stream of tuples.           if (current == nil) {               current = [stream map:^(id x) {                   return RACTuplePack(x);               }];             continue;         }           current = block(current, stream);       }     if (current == nil) return [self empty]; //第二段     return [current map:^(RACTuple *xs) {           // Right now, each value is contained in its own tuple, sorta like:           //           // (((1), 2), 3)           //           // We need to unwrap all the layers and create a tuple out of the result.           NSMutableArray *values = [[NSMutableArray alloc] init];           while (xs != nil) {             [values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];               xs = (xs.count > 1 ? xs.first : nil);         }           return [RACTuple tupleWithObjectsFromArray:values];     }]; } 
这部分代码分2段,第一段是将两个信号合并的逻辑,具体的合并逻辑是由外面传进来的block确定的。第二段是通过map将信号的值重新做了处理,第一段得到的信号属于signal of signals的类型,第二段将它打平。
再看一下combineLatestWith:方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 - (RACSignal *)combineLatestWith:(RACSignal *)signal {     NSCParameterAssert(signal != nil);       return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {           RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];           __block id lastSelfValue = nil;           __block BOOL selfCompleted = NO;           __block id lastOtherValue = nil;           __block BOOL otherCompleted = NO;           void (^sendNext)(void) = ^{               @synchronized (disposable) {                   if (lastSelfValue == nil || lastOtherValue == nil) return;                   [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];               }           };           RACDisposable *selfDisposable = [self subscribeNext:^(id x) {               @synchronized (disposable) {                   lastSelfValue = x ?: RACTupleNil.tupleNil;                   sendNext();               }           } error:^(NSError *error) {               [subscriber sendError:error];           } completed:^{               @synchronized (disposable) {                   selfCompleted = YES;                   if (otherCompleted) [subscriber sendCompleted];               }           }];           [disposable addDisposable:selfDisposable];           RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {               @synchronized (disposable) {                   lastOtherValue = x ?: RACTupleNil.tupleNil;                   sendNext();               }           } error:^(NSError *error) {               [subscriber sendError:error];           } completed:^{               @synchronized (disposable) {                   otherCompleted = YES;                   if (selfCompleted) [subscriber sendCompleted];               }           }];           [disposable addDisposable:otherDisposable];         return disposable;       }] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal]; } 
在以上代码中,调用了当前信号的subscribeNext方法,同时也调用了需要合并的信号的subscribeNext方法。subscribeNext方法block中调用了sendNext block,这个block是在combineLatestWith中定义,判断两个信号是否已经调用过sendNext,如果都同时掉用过sendNext就会触发combineLatest信号调用didSubscribe block,最终触发订阅combineLatest信号的传入的subscribeNext后的block。
综合上面的分析,类似于以下的使用方式
1 2 3 [[RACSignal combineLatest:@[[RACObserve(self, propertyA) ignore:nil], [RACObserve(self, propertyB) ignore:nil]]] subscribeNext:^(RACTuple *tuple) {   }]; 
第一次订阅就会触发subscribeNext之后的block逻辑,并且是RACObserve这种类型的combineLatest才会,最上面已经分析了RACObserver生成的信号在第一次订阅调用的时候信号就会调用sendNext。