问题1

在看代码的时候,发现代码中使用到了

1
2
3
4
5
6
7
8
9
10
11
12
13
- (void)viewDidload
{
[super viewDidload];
[self bindData];
}

- (void)bindData
{
[[RACObserve(self, propertyA) ignore:nil]
subscribeNext:^(NSArray *dataA) {
NSLog(@"use dataA");
}];
}

但是在这个类的propertyA是在init之后去设置的,在viewDidload之前。也就是在使用RAC订阅属性变化信号之前,但是use dataA打印出来了。猜测RACObserve宏生成信号在调用subscribeNext中,直接就调用了dataA的block的逻辑。但是感觉比较奇怪,不应该是propertyA变化的时候才会调用dataA的block的逻辑吗。

现在具体看一下,一个信号的创建和订阅的源码:

信号创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))
didSubscribe {

return [RACDynamicSignal createSignal:didSubscribe];

}
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))
didSubscribe {

RACDynamicSignal *signal = [[self alloc] init];

signal->_didSubscribe = [didSubscribe copy];

return [signal setNameWithFormat:@"+createSignal:"];
}

在创建一个信号的时候,会传进来一个叫didSubscribe的block,该信号会把它存下来。

信号订阅

RACSignal的subscribeNext方法:

1
2
3
4
5
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}

在singal的subscribeNext中,生成了一个subscriber。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError
*error))error completed:(void (^)(void))completed {

RACSubscriber *subscriber = [[self alloc] init];


subscriber->_next = [next copy];

subscriber->_error = [error copy];

subscriber->_completed = [completed copy];

return subscriber;
}

subscriber保存了nextBlock,errorBlock,completedBlock等数据信息

接着看signal的subscribe方法,改方法的参数是subscribeNext方法中生成的subscriber对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {

NSCParameterAssert(subscriber != nil);

RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];


if (self.didSubscribe != NULL) {

RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{

RACDisposable *innerDisposable = self.didSubscribe(subscriber);

[disposable addDisposable:innerDisposable];

}];


[disposable addDisposable:schedulingDisposable];

}

return disposable;

}
1
2
3
4
5
6
7
8
9
10
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);

if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];

block();

return nil;

}

在signal的subscribe方法中,调用了RACScheduler.subscriptionScheduler schedule 方法,直接就将传入的block调用了,最终调用了signal的didSubscribe block,将subscriber传入。

再看一下RACObserve在生成一个signal的时候,传入的didSubscribe block逻辑的怎样的,以下是RACObserve相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#define RACObserve(TARGET, KEYPATH) \

({ \

_Pragma("clang diagnostic push") \

_Pragma("clang diagnostic ignored \"-Wreceiver-is-weak\"") \

__weak id target_ = (TARGET); \

[target_ rac_valuesForKeyPath:@keypath(TARGET, KEYPATH) observer:self];
\

_Pragma("clang diagnostic pop") \

})

在NSObject的RACPropertySubscribing分类中定义rac_valuesForKeyPath:observer:self:方法

继续:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- (RACSignal *)rac_valuesForKeyPath:(NSString *)keyPath observer:(__weak 
NSObject *)observer {

return [[[self

rac_valuesAndChangesForKeyPath:keyPath options:NSKeyValueObservingOptionInitial observer:observer]

map:^(RACTuple *value) {

// -map: because it doesn't require the block trampoline that -reduceEach: uses

return value[0];

}]

setNameWithFormat:@"RACObserve(%@, %@)", self.rac_description, keyPath];

}

继续:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
- (RACSignal *)rac_valuesAndChangesForKeyPath:(NSString *)keyPath options:(
NSKeyValueObservingOptions)options observer:(__weak NSObject *)
weakObserver {

NSObject *strongObserver = weakObserver;

keyPath = [keyPath copy];

NSRecursiveLock *objectLock = [[NSRecursiveLock alloc] init];

objectLock.name = @"
org.reactivecocoa.ReactiveCocoa.NSObjectRACPropertySubscribing";

__weak NSObject *weakSelf = self;

RACSignal *deallocSignal = [[RACSignal

zip:@[

self.rac_willDeallocSignal,

strongObserver.rac_willDeallocSignal ?: [RACSignal never]

]]

doCompleted:^{

// Forces deallocation to wait if the object variables are currently

// being read on another thread.

[objectLock lock];

@onExit {

[objectLock unlock];

};

}];

//重点关注这里,createSignal之后的参数就是该信号的didSubscribe block逻辑了。
return [[[RACSignal

createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {

// Hold onto the lock the whole time we're setting up the KVO

// observation, because any resurrection that might be caused by our

// retaining below must be balanced out by the time -dealloc returns

// (if another thread is waiting on the lock above).

[objectLock lock];

@onExit {

[objectLock unlock];

};

__strong NSObject *observer __attribute__((objc_precise_lifetime))
= weakObserver;

__strong NSObject *self __attribute__((objc_precise_lifetime)) =
weakSelf;

if (self == nil) {

[subscriber sendCompleted];

return nil;

}


return [self rac_observeKeyPath:keyPath options:options observer:
observer block:^(id value, NSDictionary *change, BOOL
causedByDealloc, BOOL affectedOnlyLastComponent) {

[subscriber sendNext:RACTuplePack(value, change)];

}];

}]

takeUntil:deallocSignal]

setNameWithFormat:@"%@ -rac_valueAndChangesForKeyPath: %@ options: %lu
observer: %@", self.rac_description, keyPath, (unsigned long)options,
strongObserver.rac_description];

}

可以看到在RACObserver宏定义的signal的didSubscriber block中又调用了rac_observeKeyPath:keyPath options: observer: block

继续(太长了只贴重点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
- (RACDisposable *)rac_observeKeyPath:(NSString *)keyPath options:(
NSKeyValueObservingOptions)options observer:(__weak NSObject *)
weakObserver block:(void (^)(id, NSDictionary *, BOOL, BOOL))block {

NSCParameterAssert(block != nil);

NSCParameterAssert(keyPath.rac_keyPathComponents.count > 0);

//省略数十行
// Call the block with the initial value if needed.

if ((options & NSKeyValueObservingOptionInitial) != 0) {

id initialValue = [self valueForKeyPath:keyPath];

NSDictionary *initialChange = @{

NSKeyValueChangeKindKey: @(NSKeyValueChangeSetting),

NSKeyValueChangeNewKey: initialValue ?: NSNull.null,

};

block(initialValue, initialChange, NO, keyPathHasOneComponent);

}

//省略数十行

}

说明一下,options是NSKeyValueObservingOptions属于NS_OPTIONS

1
2
3
4
5
6
7
8
9
10
typedef NS_OPTIONS(NSUInteger, NSKeyValueObservingOptions) {

NSKeyValueObservingOptionNew = 0x01,

NSKeyValueObservingOptionOld = 0x02,

NSKeyValueObservingOptionInitial NS_ENUM_AVAILABLE(10_5, 2_0) = 0x04,

NSKeyValueObservingOptionPrior NS_ENUM_AVAILABLE(10_5, 2_0) = 0x08
};

在以上方法中,它判断了,传入的options是否是NSKeyValueObservingOptionInitial类型,而在调用rac_observeKeyPath: options: observer: block:的时候,option就是传的NSKeyValueObservingOptionInitial,所以会直接调用传进来的block,在rac_valuesAndChangesForKeyPath: options: observer:中调用rac_observeKeyPath: options: observer: block:的时候传入block里面的逻辑是这样:

1
[subscriber sendNext:RACTuplePack(value, change)];

综上所述,RACObserver生成的signal在调用subscribeNext方法订阅该信号的时候,会直接调用一次订阅信号之后next block的逻辑,所以即便是属性变化之后订阅属性变化信号,它也会默认先调用一次next block的逻辑。

正常kvo检测转化成信号的逻辑:

在RACObserver初始化的过程中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
- (RACDisposable *)rac_observeKeyPath:(NSString *)keyPath options:(
NSKeyValueObservingOptions)options observer:(__weak NSObject *)
weakObserver block:(void (^)(id, NSDictionary *, BOOL, BOOL))block {

NSCParameterAssert(block != nil);

NSCParameterAssert(keyPath.rac_keyPathComponents.count > 0);

//省略数十行

NSKeyValueObservingOptions trampolineOptions = (options |
NSKeyValueObservingOptionPrior) & ~NSKeyValueObservingOptionInitial;

RACKVOTrampoline *trampoline = [[RACKVOTrampoline alloc] initWithTarget:
self observer:strongObserver keyPath:keyPathHead options:trampolineOptions
block:^(id trampolineTarget, id trampolineObserver, NSDictionary *change) {

// If this is a prior notification, clean up all the callbacks added to the

// previous value and call the callback block. Everything else is deferred

// until after we get the notification after the change.

if ([change[NSKeyValueChangeNotificationIsPriorKey] boolValue]) {

[firstComponentDisposable() dispose];

if ((options & NSKeyValueObservingOptionPrior) != 0) {

block([trampolineTarget valueForKeyPath:keyPath], change, NO,
keyPathHasOneComponent);

}
return;

}

// From here the notification is not prior.

NSObject *value = [trampolineTarget valueForKey:keyPathHead];


// If the value has changed but is nil, there is no need to add callbacks to

// it, just call the callback block.

if (value == nil) {
block(nil, change, NO, keyPathHasOneComponent);

return;
}

// From here the notification is not prior and the value is not nil.


// Create a new firstComponentDisposable while getting rid of the old one at

// the same time, in case this is being called concurrently.

RACDisposable *oldFirstComponentDisposable = [
firstComponentSerialDisposable swapInDisposable:[RACCompoundDisposable
compoundDisposable]];

[oldFirstComponentDisposable dispose];

addDeallocObserverToPropertyValue(value);


// If there are no further key path components, there is no need to add the

// other callbacks, just call the callback block with the value itself.

if (keyPathHasOneComponent) {

block(value, change, NO, keyPathHasOneComponent);

return;

}

// The value has changed, is not nil, and there are more key path components

// to consider. Add the callbacks to the value for the remaining key path

// components and call the callback block with the current value of the full

// key path.

addObserverToValue(value);

block([value valueForKeyPath:keyPathTail], change, NO, keyPathHasOneComponent);

}];

// Stop the KVO observation when this one is disposed of.

[disposable addDisposable:trampoline];

//省略数十行

}

在该方法中生成了一个RACKVOTrampoline中间对象,看它的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
- (id)initWithTarget:(__weak NSObject *)target observer:(__weak NSObject *)
observer keyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)
options block:(RACKVOBlock)block {

NSCParameterAssert(keyPath != nil);

NSCParameterAssert(block != nil);

NSObject *strongTarget = target;

if (strongTarget == nil) return nil;

self = [super init];

if (self == nil) return nil;
_keyPath = [keyPath copy];

_block = [block copy];

_weakTarget = target;

_unsafeTarget = strongTarget;

_observer = observer;

[RACKVOProxy.sharedProxy addObserver:self forContext:(__bridge void *)self];

[strongTarget addObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath
options:options context:(__bridge void *)self];

[strongTarget.rac_deallocDisposable addDisposable:self];

[self.observer.rac_deallocDisposable addDisposable:self];
return self;
}

- (void)dealloc {

[self dispose];

}

#pragma mark Observation

- (void)dispose {

NSObject *target;

NSObject *observer;

@synchronized (self) {

_block = nil;


// The target should still exist at this point, because we still need to

// tear down its KVO observation. Therefore, we can use the unsafe

// reference (and need to, because the weak one will have been zeroed by

// now).

target = self.unsafeTarget;

observer = self.observer;




_unsafeTarget = nil;

_observer = nil;

}

[target.rac_deallocDisposable removeDisposable:self];

[observer.rac_deallocDisposable removeDisposable:self];

[target removeObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath
context:(__bridge void *)self];

[RACKVOProxy.sharedProxy removeObserver:self forContext:(__bridge void *)self];
}


//系统的代理方法,其实是由RACKVOProxy.sharedProxy对象转发的,RACKVOProxy.sharedProxy才是真正处理系统消息的对象。
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(
NSDictionary *)change context:(void *)context {

if (context != (__bridge void *)self) {

[super observeValueForKeyPath:keyPath ofObject:object change:change
context:context];

return;

}

RACKVOBlock block;

id observer;

id target;

@synchronized (self) {

block = self.block;

observer = self.observer;

target = self.weakTarget;

}

if (block == nil || target == nil) return;

block(target, observer, change);

}

可以看到RACKVOTrampoline对象替代原来使用KVO的对象,作为系统的代理,实现了代理方法。实际上,真正调用系统KVO注册的方法的时候,是往一个叫RACKVOProxy.sharedProxy的全局单例对象注册的。RACKVOTrampoline是具体处理KVO消息的对象,在RACKVOPorxy.shareProxy对象中注册了所有使用RAC KVO的系统消息,再由它转发给具体的RACKVOTrampoline进行处理,而在RACKVOTrampoline处理的时候,调用了RACKVOtrampoline初始化的时候传进来的block。之后在RACKVOTrampoline参数block调用过程中就会调用sendNext方法了,往外面发信号数据。

以下是RACKVOProxy.sharedProxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
@interface RACKVOProxy()

@property (strong, nonatomic, readonly) NSMapTable *trampolines;

@property (strong, nonatomic, readonly) dispatch_queue_t queue;


@end

@implementation RACKVOProxy


+ (instancetype)sharedProxy {

static RACKVOProxy *proxy;

static dispatch_once_t onceToken;




dispatch_once(&onceToken, ^{

proxy = [[self alloc] init];

});




return proxy;

}

- (instancetype)init {

self = [super init];

if (self == nil) return nil;




_queue = dispatch_queue_create("org.reactivecocoa.ReactiveCocoa.RACKVOProxy
", DISPATCH_QUEUE_SERIAL);

_trampolines = [NSMapTable strongToWeakObjectsMapTable];




return self;

}

- (void)addObserver:(__weak NSObject *)observer forContext:(void *)context {

NSValue *valueContext = [NSValue valueWithPointer:context];




dispatch_sync(self.queue, ^{

[self.trampolines setObject:observer forKey:valueContext];

});

}

- (void)removeObserver:(NSObject *)observer forContext:(void *)context {

NSValue *valueContext = [NSValue valueWithPointer:context];




dispatch_sync(self.queue, ^{

[self.trampolines removeObjectForKey:valueContext];

});

}

- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(
NSDictionary *)change context:(void *)context {

NSValue *valueContext = [NSValue valueWithPointer:context];

__block NSObject *trueObserver;




dispatch_sync(self.queue, ^{

trueObserver = [self.trampolines objectForKey:valueContext];

});



if (trueObserver != nil) {

[trueObserver observeValueForKeyPath:keyPath ofObject:object change:
change context:context];

}

}

RACKVOProxy.sharedProxy管理了整个RAC 中KVO的处理系统KVO消息的中间对象和系统KVO消息的转发。

综合上面的代码可以看出,正是由于各种中间对象替用户实现了代理方法起了代理对象的作用,用户才能把代码写的更加紧凑清晰。

问题2

看以下代码,假设combineLatest之后得到的信号是A

1
2
3
4
5
[[RACSignal combineLatest:@[[RACObserve(self, propertyA) ignore:nil], [
RACObserve(self, propertyB) ignore:nil]]] subscribeNext:^(RACTuple *tuple) {


}];

1.使用combineLatest的时候,第一次订阅会不会触发subscribeNext后面的block

2.combineLatest中的信号,是同时调用了sendNext之后会触发A调用sendNext,还是只需要其中有一个信号调用了sendNext会触发A调用sendNext

看一下combineLatest源码:

1
2
3
4
5
6
7
8
+ (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals {

return [[self join:signals block:^(RACSignal *left, RACSignal *right) {

return [left combineLatestWith:right];

}] setNameWithFormat:@"+combineLatest: %@", signals];
}

继续 join: block:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
+ (instancetype)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, 
id))block {


//第一段
RACStream *current = nil;
// Creates streams of successively larger tuples by combining the input

// streams one-by-one.

for (RACStream *stream in streams) {

// For the first stream, just wrap its values in a RACTuple. That way,

// if only one stream is given, the result is still a stream of tuples.

if (current == nil) {

current = [stream map:^(id x) {

return RACTuplePack(x);

}];
continue;
}

current = block(current, stream);

}
if (current == nil) return [self empty];
//第二段
return [current map:^(RACTuple *xs) {

// Right now, each value is contained in its own tuple, sorta like:

//

// (((1), 2), 3)

//

// We need to unwrap all the layers and create a tuple out of the result.

NSMutableArray *values = [[NSMutableArray alloc] init];

while (xs != nil) {
[values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];

xs = (xs.count > 1 ? xs.first : nil);
}

return [RACTuple tupleWithObjectsFromArray:values];
}];
}

这部分代码分2段,第一段是将两个信号合并的逻辑,具体的合并逻辑是由外面传进来的block确定的。第二段是通过map将信号的值重新做了处理,第一段得到的信号属于signal of signals的类型,第二段将它打平。

再看一下combineLatestWith:方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
- (RACSignal *)combineLatestWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {

RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

__block id lastSelfValue = nil;

__block BOOL selfCompleted = NO;

__block id lastOtherValue = nil;

__block BOOL otherCompleted = NO;

void (^sendNext)(void) = ^{

@synchronized (disposable) {

if (lastSelfValue == nil || lastOtherValue == nil) return;

[subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];

}

};

RACDisposable *selfDisposable = [self subscribeNext:^(id x) {

@synchronized (disposable) {

lastSelfValue = x ?: RACTupleNil.tupleNil;

sendNext();

}

} error:^(NSError *error) {

[subscriber sendError:error];

} completed:^{

@synchronized (disposable) {

selfCompleted = YES;

if (otherCompleted) [subscriber sendCompleted];

}

}];

[disposable addDisposable:selfDisposable];

RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {

@synchronized (disposable) {

lastOtherValue = x ?: RACTupleNil.tupleNil;

sendNext();

}

} error:^(NSError *error) {

[subscriber sendError:error];

} completed:^{

@synchronized (disposable) {

otherCompleted = YES;

if (selfCompleted) [subscriber sendCompleted];

}

}];

[disposable addDisposable:otherDisposable];
return disposable;

}] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
}

在以上代码中,调用了当前信号的subscribeNext方法,同时也调用了需要合并的信号的subscribeNext方法。subscribeNext方法block中调用了sendNext block,这个block是在combineLatestWith中定义,判断两个信号是否已经调用过sendNext,如果都同时掉用过sendNext就会触发combineLatest信号调用didSubscribe block,最终触发订阅combineLatest信号的传入的subscribeNext后的block。

综合上面的分析,类似于以下的使用方式

1
2
3
[[RACSignal combineLatest:@[[RACObserve(self, propertyA) ignore:nil], [RACObserve(self, propertyB) ignore:nil]]] subscribeNext:^(RACTuple *tuple) {

}];

第一次订阅就会触发subscribeNext之后的block逻辑,并且是RACObserve这种类型的combineLatest才会,最上面已经分析了RACObserver生成的信号在第一次订阅调用的时候信号就会调用sendNext。