GCD 队列探索

dispatch_object_t

所有GCD结构体的联合体,可以表示任意GCD对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef union {
struct _os_object_s *_os_obj;
struct dispatch_object_s *_do;
struct dispatch_continuation_s *_dc;
struct dispatch_queue_s *_dq;
struct dispatch_queue_attr_s *_dqa;
struct dispatch_group_s *_dg;
struct dispatch_source_s *_ds;
struct dispatch_mach_s *_dm;
struct dispatch_mach_msg_s *_dmsg;
struct dispatch_source_attr_s *_dsa;
struct dispatch_semaphore_s *_dsema;
struct dispatch_data_s *_ddata;
struct dispatch_io_s *_dchannel;
struct dispatch_operation_s *_doperation;
struct dispatch_disk_s *_ddisk;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;

_os_object_s

OC对象

1
2
3
4
5
6
7
8
9
typedef struct _os_object_vtable_s {
_void *_os_obj_objc_class_t[5];
} _os_object_vtable_s;

typedef struct _os_object_s {
const _os_object_vtable_s *os_obj_isa;
int volatile os_obj_ref_cnt;
int volatile os_obj_xref_cnt;
} _os_object_s;

dispatch_object_s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
struct dispatch_object_s; 
struct dispatch_object_extra_vtable_s {
unsigned long const do_type;
const char *const do_kind;
void (*const do_invoke)(struct dispatch_object_s *, dispatch_invoke_context_t,
dispatch_invoke_flags_t);
void (*const do_push)(struct dispatch_object_s *, dispatch_object_t,
dispatch_qos_t)
void (*const do_wakeup)(struct dispatch_object_s *,
dispatch_qos_t, dispatch_wakeup_flags_t);
void (*const do_dispose)(struct dispatch_object_s *, bool *allow_free)
void (*const do_set_targetq)(struct dispatch_object_s *, dispatch_queue_t);
void (*const do_suspend)(struct dispatch_object_s *);
void (*const do_resume)(struct dispatch_object_s *,
bool activate);
void (*const do_finalize_activation)(struct dispatch_object_s *, bool *allow_resume);
size_t (*const do_debug)(struct dispatch_object_s *, char *, size_t)
};
typedef struct _os_object_s *_os_object_t;
struct dispatch_object_vtable_s {
void (*_os_obj_xref_dispose)(_os_object_t);
void (*_os_obj_dispose)(_os_object_t);
struct dispatch_object_extra_vtable_s _os_obj_vtable;
};

struct dispatch_object_s {
struct _os_object_s _as_os_obj[0];

const struct dispatch_object_vtable_s *do_vtable;
int volatile do_ref_cnt;
int volatile do_xref_cnt;

struct dispatch_object_s *volatile do_next;
struct dispatch_queue_s *do_targetq;
void *do_ctxt;
void *do_finalizer;
};

dispatch_continuation_s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
typedef struct voucher_s {
struct voucher_vtable_s *os_obj_isa;
int volatile os_obj_ref_cnt;
int volatile os_obj_xref_cnt;
struct voucher_hash_entry_s {
uintptr_t vhe_next;
uintptr_t vhe_prev_ptr;
} v_list;
mach_voucher_t v_kvoucher, v_ipc_kvoucher; // if equal, only one reference
voucher_t v_kvbase; // if non-NULL, v_kvoucher is a borrowed reference
firehose_activity_id_t v_activity;
uint64_t v_activity_creator;
firehose_activity_id_t v_parent_activity;
_voucher_priority_t v_priority;
unsigned int v_kv_has_importance:1;
#if VOUCHER_ENABLE_RECIPE_OBJECTS
size_t v_recipe_extra_offset;
mach_voucher_attr_recipe_size_t v_recipe_extra_size;
#endif
} voucher_s;

typedef struct dispatch_continuation_s {
struct dispatch_object_s _as_do[0];
union {
const void *do_vtable;
uintptr_t dc_flags;
};
union {
pthread_priority_t dc_priority;
int dc_cache_cnt;
uintptr_t dc_pad;
};
struct dispatch_continuation_s *volatile do_next;
struct voucher_s *dc_voucher;
dispatch_function_t dc_func;
void *dc_ctxt;
void *dc_data;
void *dc_other;
} *dispatch_continuation_t;

dispatch_queue_s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct dispatch_queue_s {
struct os_mpsc_queue_s _as_oq[0];
struct dispatch_object_s _as_do[0];
struct _os_object_s _as_os_obj[0];
const struct dispatch_queue_vtable_s *do_vtable;
int volatile do_ref_cnt;
int volatile do_xref_cnt;
OS_OBJECT_STRUCT_HEADER(dispatch_queue);
struct dispatch_queue_s *volatile do_next;
struct dispatch_queue_s *do_targetq;
void *do_ctxt;
void *do_finalizer;
_OS_MPSC_QUEUE_FIELDS(dq, dq_state);
uint32_t dq_side_suspend_cnt;
dispatch_unfair_lock_s dq_sidelock;
union {
dispatch_queue_t dq_specific_q;
struct dispatch_source_refs_s *ds_refs;
struct dispatch_timer_source_refs_s *ds_timer_refs;
struct dispatch_mach_recv_refs_s *dm_recv_refs;
};
DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags,
const uint16_t dq_width,
const uint16_t __dq_opaque
);
char _dq_pad[DISPATCH_QUEUE_CACHELINE_PAD]; // for static queues only
} __attribute__((aligned(8)));

dispatch_queue_attr_s

1
2
3
4
5
6
7
8
9
10
struct dispatch_queue_attr_s {
const struct dispatch_queue_attr_vtable_s *do_vtable;
int volatile do_ref_cnt;
int volatile do_xref_cnt;
dispatch_priority_requested_t dqa_qos_and_relpri;
uint16_t dqa_overcommit:2; // queue创建的线程数是否允许超过实际的CPU个数
uint16_t dqa_autorelease_frequency:2;
uint16_t dqa_concurrent:1;
uint16_t dqa_inactive:1;
};

dispatch_group_s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct dispatch_group_s {
struct dispatch_object_s _as_do[0];
struct _os_object_s _as_os_obj[0];
const struct dispatch_group_vtable_s *do_vtable;
int volatile do_ref_cnt;
int volatile do_xref_cnt;
struct dispatch_group_s *volatile do_next;
struct dispatch_queue_s *do_targetq;
void *do_ctxt;
void *do_finalizer;
long volatile dg_value;
_dispatch_sema4_t dg_sema;
int volatile dg_waiters;
struct dispatch_continuation_s *volatile dg_notify_head;
struct dispatch_continuation_s *volatile dg_notify_tail;
};

dispatch_source_s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
struct dispatch_source_s {
struct dispatch_queue_s _as_dq[0];
struct os_mpsc_queue_s _as_oq[0];
struct dispatch_object_s _as_do[0];
struct _os_object_s _as_os_obj[0];
const struct dispatch_source_vtable_s *do_vtable;
int volatile do_ref_cnt;
int volatile do_xref_cnt;
struct dispatch_source_s *volatile do_next;
struct dispatch_queue_s *do_targetq;
void *do_ctxt;
void *do_finalizer;
_OS_MPSC_QUEUE_FIELDS(dq, dq_state);
uint32_t dq_side_suspend_cnt;
dispatch_unfair_lock_s dq_sidelock;
union {
dispatch_queue_t dq_specific_q;
struct dispatch_source_refs_s *ds_refs;
struct dispatch_timer_source_refs_s *ds_timer_refs;
struct dispatch_mach_recv_refs_s *dm_recv_refs;
};
DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags,
const uint16_t dq_width,
const uint16_t __dq_opaque
);
unsigned int
ds_is_installed:1,
dm_needs_mgr:1,
dm_connect_handler_called:1,
dm_uninstalled:1,
dm_cancel_handler_called:1,
dm_is_xpc:1;
uint64_t ds_data DISPATCH_ATOMIC64_ALIGN;
uint64_t ds_pending_data DISPATCH_ATOMIC64_ALIGN;
} __attribute__((aligned(8)));

dispatch_semaphore_s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct dispatch_semaphore_s {
struct dispatch_object_s _as_do[0];
struct _os_object_s _as_os_obj[0];
const struct dispatch_semaphore_vtable_s *do_vtable;
int volatile do_ref_cnt;
int volatile do_xref_cnt;
struct dispatch_semaphore_s *volatile do_next;
struct dispatch_queue_s *do_targetq;
void *do_ctxt;
void *do_finalizer;
long volatile dsema_value;
_dispatch_sema4_t dsema_sema;
long dsema_orig;
};

dispatch_data_s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
typedef void (^dispatch_block_t)(void);

struct dispatch_data_s;
typedef struct dispatch_data_s *dispatch_data_t;
typedef struct range_record_s {
dispatch_data_t data_object;
size_t from;
size_t length;
} range_record;

struct dispatch_data_s {
#if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA
const void *do_vtable;
dispatch_queue_t do_targetq;
void *ctxt;
void *finalizer;
#else
DISPATCH_OBJECT_HEADER(data);
#endif // DISPATCH_DATA_IS_BRIDGED_TO_NSDATA
const void *buf;
dispatch_block_t destructor;
size_t size, num_records;
range_record records[0];
};

dispatch_sync_context_s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct dispatch_sync_context_s {
struct dispatch_object_s _as_do[0];
struct dispatch_continuation_s _as_dc[0];
DISPATCH_CONTINUATION_HEADER(continuation);
dispatch_function_t dsc_func;
void *dsc_ctxt;
#if DISPATCH_COCOA_COMPAT
dispatch_thread_frame_s dsc_dtf;
#endif
dispatch_thread_event_s dsc_event;
dispatch_tid dsc_waiter;
dispatch_qos_t dsc_override_qos_floor;
dispatch_qos_t dsc_override_qos;
bool dsc_wlh_was_first;
bool dsc_release_storage;
} *dispatch_sync_context_t;

dispatch queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
struct dispatch_queue_s _dispatch_root_queues[] = {
#define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
#define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
[_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
.do_ctxt = &_dispatch_root_queue_contexts[ \
_DISPATCH_ROOT_QUEUE_IDX(n, flags)], \
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
.dq_priority = _dispatch_priority_make(DISPATCH_QOS_##n, 0) | flags | \
DISPATCH_PRIORITY_FLAG_ROOTQUEUE | \
((flags & DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE) ? 0 : \
DISPATCH_QOS_##n << DISPATCH_PRIORITY_OVERRIDE_SHIFT), \
__VA_ARGS__ \
}
_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
.dq_label = "com.apple.root.maintenance-qos",
.dq_serialnum = 4,
),
_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.maintenance-qos.overcommit",
.dq_serialnum = 5,
),
_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
.dq_label = "com.apple.root.background-qos",
.dq_serialnum = 6,
),
_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.background-qos.overcommit",
.dq_serialnum = 7,
),
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
.dq_label = "com.apple.root.utility-qos",
.dq_serialnum = 8,
),
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.utility-qos.overcommit",
.dq_serialnum = 9,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE,
.dq_label = "com.apple.root.default-qos",
.dq_serialnum = 10,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.default-qos.overcommit",
.dq_serialnum = 11,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
.dq_label = "com.apple.root.user-initiated-qos",
.dq_serialnum = 12,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-initiated-qos.overcommit",
.dq_serialnum = 13,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
.dq_label = "com.apple.root.user-interactive-qos",
.dq_serialnum = 14,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-interactive-qos.overcommit",
.dq_serialnum = 15,
),
};

系统维护12个root queue,可分成两组:DISPATCH_ROOT_QUEUE_IDX_XXX_QOSDISPATCH_ROOT_QUEUE_IDX_XXX_QOS_OVERCOMMIT

获取queue

1
2
3
4
5
6
7
static inline dispatch_queue_t
_dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit) {
if (unlikely(qos == DISPATCH_QOS_UNSPECIFIED || qos > DISPATCH_QOS_MAX)) {
DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
}
return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}

main/mgr/root queue init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
void libdispatch_init(void) {
...
// 主线程优级级设置

#if HAVE_PTHREAD_WORKQUEUE_QOS
dispatch_qos_t qos = _dispatch_qos_from_qos_class(qos_class_main());
dispatch_priority_t pri = _dispatch_priority_make(qos, 0);
_dispatch_main_q.dq_priority = _dispatch_priority_with_override_qos(pri, qos);
#if DISPATCH_DEBUG
if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
_dispatch_set_qos_class_enabled = 1;
}
#endif
#endif

#if DISPATCH_USE_THREAD_LOCAL_STORAGE
_dispatch_thread_key_create(&__dispatch_tsd_key, _libdispatch_tsd_cleanup);
#else
_dispatch_thread_key_create(&dispatch_priority_key, NULL);
_dispatch_thread_key_create(&dispatch_r2k_key, NULL);
_dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
_dispatch_thread_key_create(&dispatch_frame_key, _dispatch_frame_cleanup);
_dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
_dispatch_thread_key_create(&dispatch_context_key, _dispatch_context_cleanup);
_dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key,
NULL);
_dispatch_thread_key_create(&dispatch_basepri_key, NULL);
#if DISPATCH_INTROSPECTION
_dispatch_thread_key_create(&dispatch_introspection_key , NULL);
#elif DISPATCH_PERF_MON
_dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
#endif
_dispatch_thread_key_create(&dispatch_wlh_key, _dispatch_wlh_cleanup);
_dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup);
_dispatch_thread_key_create(&dispatch_deferred_items_key,
_dispatch_deferred_items_cleanup);
#endif

/* 从root queue contexts中取出 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT的queue */

#if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
_dispatch_main_q.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT];
#endif

_dispatch_queue_set_current(&_dispatch_main_q);
_dispatch_queue_set_bound_thread(&_dispatch_main_q);

#if DISPATCH_USE_PTHREAD_ATFORK
(void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
dispatch_atfork_parent, dispatch_atfork_child));
#endif
_dispatch_hw_config_init();
_dispatch_time_init() = {
#if DISPATCH_USE_HOST_TIME
mach_timebase_info_data_t tbi;
(void)dispatch_assume_zero(mach_timebase_info(&tbi));
_dispatch_host_time_init(&tbi);
#endif // DISPATCH_USE_HOST_TIME
};
_dispatch_vtable_init();
_os_object_init();
_voucher_init();
_dispatch_introspection_init() = {
TAILQ_INSERT_TAIL(&_dispatch_introspection.queues,
&_dispatch_main_q, diq_list); //主线程的初始化
TAILQ_INSERT_TAIL(&_dispatch_introspection.queues,
&_dispatch_mgr_q, diq_list); //管理线程的初始化
#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
TAILQ_INSERT_TAIL(&_dispatch_introspection.queues,
_dispatch_mgr_q.do_targetq, diq_list);
#endif
/* root queue的初始化 */
for (size_t i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
TAILQ_INSERT_TAIL(&_dispatch_introspection.queues,
&_dispatch_root_queues[i], diq_list);
}

_dispatch_introspection.debug_queue_inversions =
_dispatch_getenv_bool("LIBDISPATCH_DEBUG_QUEUE_INVERSIONS", false);

// Hack to determine queue TSD offset from start of pthread structure
uintptr_t thread = _dispatch_thread_self();
thread_identifier_info_data_t tiid;
mach_msg_type_number_t cnt = THREAD_IDENTIFIER_INFO_COUNT;
kern_return_t kr = thread_info(pthread_mach_thread_np((void*)thread),
THREAD_IDENTIFIER_INFO, (thread_info_t)&tiid, &cnt);
if (!dispatch_assume_zero(kr)) {
_dispatch_introspection.thread_queue_offset =
(void*)(uintptr_t)tiid.dispatch_qaddr - (void*)thread;
}
_dispatch_thread_key_create(&dispatch_introspection_key,
_dispatch_introspection_thread_remove);
_dispatch_introspection_thread_add(); // add main thread
};
}

global queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
dispatch_queue_t dispatch_get_global_queue(long priority, unsigned long flags) {
if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
return DISPATCH_BAD_INPUT;
}
dispatch_qos_t qos = _dispatch_qos_from_queue_priority(priority);
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == QOS_CLASS_MAINTENANCE) {
qos = DISPATCH_QOS_BACKGROUND;
} else if (qos == QOS_CLASS_USER_INTERACTIVE) {
qos = DISPATCH_QOS_USER_INITIATED;
}
#endif
if (qos == DISPATCH_QOS_UNSPECIFIED) {
return DISPATCH_BAD_INPUT;
}
return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
}

可以发现global queue是从root queue contexts中取出的可以OVERCOMMIT的 queue

main queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct dispatch_queue_s _dispatch_main_q = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
/* 从root queue contexts中取出 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT的queue */
#if !DISPATCH_USE_RESOLVERS
.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
#endif
.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
DISPATCH_QUEUE_ROLE_BASE_ANON,
.dq_label = "com.apple.main-thread",
.dq_atomic_flags = DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC | DQF_WIDTH(1),
.dq_serialnum = 1,
};

dispatch_queue_t dispatch_get_main_queue(void) {
return DISPATCH_GLOBAL_OBJECT(dispatch_queue_t, _dispatch_main_q); // 返回就是全局的_dispatch_main_q
}

可以发现main queue也是从root queue contexts中取出的

user queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#define DISPATCH_TARGET_QUEUE_DEFAULT NULL

dispatch_queue_t dispatch_queue_create(const char *label, dispatch_queue_attr_t attr) {
return _dispatch_queue_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}

static dispatch_queue_t _dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy) {
if (!slowpath(dqa)) {
dqa = _dispatch_get_default_queue_attr(); //如果没有queue属性,使用默认属性
/*
dqa.dispatch_priority_requested_t dqa_qos_and_relpri = 0;
dqa.dqa_overcommit = _dispatch_queue_attr_overcommit_unspecified;
dqa.dqa_autorelease_frequency = DISPATCH_AUTORELEASE_FREQUENCY_INHERIT;
dqa.dqa_concurrent = 0;
dqa.dqa_inactive = 0;
*/
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}

//
// Step 1: Normalize arguments (qos, overcommit, tq)
//

dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri); //获取QOS
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
qos = DISPATCH_QOS_USER_INITIATED;
}
if (qos == DISPATCH_QOS_MAINTENANCE) {
qos = DISPATCH_QOS_BACKGROUND;
}
#endif // !HAVE_PTHREAD_WORKQUEUE_QOS
/*
获取overcommit状态
非全局target queue不能 overcommit
如果优先级里有 overcommit 的Flag,则开启 overcommit,反之则关闭
串行队列默认不开启 overcommit ,并发队列则相反
*/
_dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
if (tq->do_targetq) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
"a non-global target queue");
}
}

if (tq && !tq->do_targetq &&
tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
// Handle discrepancies between attr and target queue, attributes win
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
overcommit = _dispatch_queue_attr_overcommit_enabled;
} else {
overcommit = _dispatch_queue_attr_overcommit_disabled;
}
}
if (qos == DISPATCH_QOS_UNSPECIFIED) {
dispatch_qos_t tq_qos = _dispatch_priority_qos(tq->dq_priority);
tq = _dispatch_get_root_queue(tq_qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
} else {
tq = NULL;
}
} else if (tq && !tq->do_targetq) {
// target is a pthread or runloop root queue, setting QoS or overcommit
// is disallowed
if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
"and use this kind of target queue");
}
if (qos != DISPATCH_QOS_UNSPECIFIED) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
"and use this kind of target queue");
}
} else {
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
// Serial queues default to overcommit!
overcommit = dqa->dqa_concurrent ?
_dispatch_queue_attr_overcommit_disabled :
_dispatch_queue_attr_overcommit_enabled;
}
}
// 从 root queue contexts 中取出 label 是 com.apple.root.default-qos 的queue
if (!tq) {
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
if (slowpath(!tq)) {
DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
}
}

//
// Step 2: Initialize the queue
//

if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
legacy = false;
}
}

// 创建队列的vtable

const void *vtable;
dispatch_queue_flags_t dqf = 0;
if (legacy) {
vtable = DISPATCH_VTABLE(queue);
} else if (dqa->dqa_concurrent) {
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
switch (dqa->dqa_autorelease_frequency) {
case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
dqf |= DQF_AUTORELEASE_NEVER;
break;
case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
dqf |= DQF_AUTORELEASE_ALWAYS;
break;
}
if (legacy) {
dqf |= DQF_LEGACY;
}
if (label) {
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
dqf |= DQF_LABEL_NEEDS_FREE;
label = tmp;
}
}
/*
通过之前创建的vtable创建queue
串行设置width为1,即最多可以创建一个线程;并行设置DISPATCH_QUEUE_WIDTH_MAX
设置queue的targettq是从root queue里取出的queue
*/
dispatch_queue_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
_dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

dq->dq_label = label;
#if HAVE_PTHREAD_WORKQUEUE_QOS
dq->dq_priority = dqa->dqa_qos_and_relpri;
if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
#endif
_dispatch_retain(tq);
if (qos == QOS_CLASS_UNSPECIFIED) {
// legacy way of inherithing the QoS from the target
_dispatch_queue_priority_inherit_from_target(dq, tq);
}
if (!dqa->dqa_inactive) {
_dispatch_queue_inherit_wlh_from_target(dq, tq);
}
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_introspection_queue_create(dq);
}

dispatch sync/async

dispatch sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work) {
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_private_data(dq, work, 0);
}
dispatch_sync_f(dq, work, _dispatch_Block_invoke(work)) = {
// 若width为1,则直接调用dispatch_barrier_sync_f
if (likely(dq->dq_width == 1)) {
return dispatch_barrier_sync_f(dq, ctxt, func);
}

// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE

// 全局的并发队列会调用_dispatch_sync_f_slow创建线程
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
return _dispatch_sync_f_slow(dq, ctxt, func, 0) = {
//如果有执行队列则开始线程上下文切换并执行
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func) = {
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
};
}
//将当前的queue置成执行队列,并等待执行
_dispatch_sync_wait(dq, ctxt, func, dc_flags, dq, dc_flags) = {
// ...

_dispatch_introspection_sync_begin(top_dq);
#if DISPATCH_COCOA_COMPAT
if (unlikely(dsc.dsc_func == NULL)) {
// Queue bound to a non-dispatch thread, the continuation already ran
// so just unlock all the things, except for the thread bound queue
dispatch_queue_t bound_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, bound_dq, top_dc_flags);
}
#endif
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags);
};
};
}

_dispatch_introspection_sync_begin(dq);
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dq, ctxt, func, 0);
}
_dispatch_sync_invoke_and_complete(dq, ctxt, func);
};
}

dispatch_barrier_async会在 flags 加 DISPATCH_BLOCK_BARRIER 值

1
2
3
4
5
6
7
void dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work) {
if (unlikely(_dispatch_block_has_private_data(work))) {
dispatch_block_flags_t flags = DISPATCH_BLOCK_BARRIER;
return _dispatch_sync_block_with_private_data(dq, work, flags);
}
dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
}

在dispatch wake时执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
void _dispatch_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags) {
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;

if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
// flags里有DISPATCH_WAKEUP_BARRIER_COMPLETE,将当时可如果是串行队列则将执行的dispatch_continuation_t置为1个;如果是并行队列则执行barrier之前的所有的dispatch_continuation_t
return _dispatch_queue_barrier_complete(dq, qos, flags) = {
dispatch_continuation_t dc_tmp, dc_start = NULL, dc_end = NULL;
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
struct dispatch_object_s *dc = NULL;
uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
size_t count = 0;

dispatch_assert(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE);

if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
dc = _dispatch_queue_head(dq);
if (!_dispatch_object_is_sync_waiter(dc)) {
// not a slow item, needs to wake up
} else if (likely(dq->dq_width == 1) ||
_dispatch_object_is_barrier(dc)) {
// rdar://problem/8290662 "barrier/writer lock transfer"
dc_start = dc_end = (dispatch_continuation_t)dc;
owned = 0;
count = 1;
dc = _dispatch_queue_next(dq, dc);
} else {
// <rdar://problem/10164594> "reader lock transfer"
// we must not wake waiters immediately because our right
// for dequeuing is granted through holding the full "barrier" width
// which a signaled work item could relinquish out from our feet
dc_start = (dispatch_continuation_t)dc;
do {
// no check on width here because concurrent queues
// do not respect width for blocked readers, the thread
// is already spent anyway
dc_end = (dispatch_continuation_t)dc;
owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
count++;
dc = _dispatch_queue_next(dq, dc);
} while (dc && _dispatch_object_is_sync_waiter_non_barrier(dc));
}

if (count) {
do {
dc_tmp = dc_start;
dc_start = dc_start->do_next;

/* 通过 event loop 执行每一个dispatch_continuation_t */

_dispatch_sync_waiter_redirect_or_wake(dq, owned, dc_tmp);
owned = DISPATCH_SYNC_WAITER_NO_UNLOCK;
} while (dc_tmp != dc_end);
if (flags & DISPATCH_WAKEUP_CONSUME_2) {
return _dispatch_release_2_tailcall(dq);
}
return;
}
if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}

return _dispatch_queue_class_barrier_complete(dq, qos, flags, target,owned);
};
}
if (_dispatch_queue_class_probe(dq)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
// 内部通过_dispatch_ktrace1方法实现
return _dispatch_queue_class_wakeup(dq, qos, flags, target);
}

dispatch async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void dispatch_async(dispatch_queue_t dq, dispatch_block_t work) {
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;

_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags) = {
dc->dc_flags = dc_flags | DISPATCH_OBJ_BLOCK_BIT;
dc->dc_ctxt = _dispatch_Block_copy(work);
_dispatch_continuation_priority_set(dc, pp, flags);

if (unlikely(_dispatch_block_has_private_data(work))) {
// always sets dc_func & dc_voucher
// may update dc_priority & do_vtable
return _dispatch_continuation_init_slow(dc, dqu, flags);
}

if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
//设置dc_func为work的执行并追加对象释放代码的函数
dc->dc_func = _dispatch_call_block_and_release;
} else {
dc->dc_func = _dispatch_Block_invoke(work);
}
_dispatch_continuation_voucher_set(dc, dqu, flags);
};
_dispatch_continuation_async(dq, dc) = {
_dispatch_continuation_async2(dq, dc,
dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT) = {
if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
// 如果是barrier async或都width为1,加到并行队列中
return _dispatch_continuation_push(dq, dc);
}
return _dispatch_async_f2(dq, dc) = {
// 加到并行队列
if (slowpath(dq->dq_items_tail)) {
return _dispatch_continuation_push(dq, dc);
}

if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {
return _dispatch_continuation_push(dq, dc);
}
// 根据优先级调度线程运行
return _dispatch_async_f_redirect(dq, dc,
_dispatch_continuation_override_qos(dq, dc));
};
};
};
}

dispatch_barrier_async 比 dispatch_async 的 dc_flags 多加了 DISPATCH_OBJ_BARRIER_BIT 位

1
2
3
4
5
6
7
void dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work) {
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;

_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
_dispatch_continuation_push(dq, dc);
}

dispatch semaphore

是对UNIX底层sem的封装

create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
dispatch_semaphore_t dispatch_semaphore_create(long value) {
dispatch_semaphore_t dsema;
// value值必须是自然数,也表示等待的线程数

if (value < 0) {
return DISPATCH_BAD_INPUT;
}

// 创建dispatch_semaphore_t

dsema = (dispatch_semaphore_t)_dispatch_object_alloc(
DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s)) = {
return _os_object_alloc_realized(vtable, size) = {
return _os_objc_alloc(cls, size) = {
id obj;
size -= sizeof(((struct _os_object_s *)NULL)->os_obj_isa);
while (!fastpath(obj = class_createInstance(cls, size))) {
_dispatch_temporary_resource_shortage();
}
return obj;
};
};
};
_dispatch_semaphore_class_init(value, dsema) = {
struct dispatch_semaphore_header_s *dsema = dsemau._dsema_hdr;

// 执行的tq是从root queue中取得的 label 是 com.apple.root.default-qos 的 queue
// 初始化_dispatch_sema4_t
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
};
dsema->dsema_orig = value;
return dsema;
}

wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) {
// 信号量减少,若小于0则等待
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (fastpath(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout) = {
long orig;
/*
初始化_dispatch_sema4_t
根据timeout进行不同的处理:
1. 时间到了,超时,解除锁定
2. 时间未到,调用UNIX底层函数等待
*/
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema) = {
int ret = sem_wait(sema);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
};
break;
}
return 0;
};
}

signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
long dispatch_semaphore_signal(dispatch_semaphore_t dsema) {
// 信号量加1,若大于0,则无需处理
// 反之则调用UNIX底层sem_post
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (fastpath(value > 0)) {
return 0;
}
if (slowpath(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema) = {
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
_dispatch_sema4_signal(&dsema->dsema_sema, 1) = {
do {
int ret = sem_post(sema);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
} while (--count);
};
return 1;
};
}

dispatch group

dispatch group依靠dispatch semaphore来工作的

create

1
2
3
4
5
6
7
8
9
10
11
12
dispatch_group_t dispatch_group_create(void) {
return _dispatch_group_create_with_count(0) = {
// 创建 dispatch_group_t 并绑定一个 dispatch_semaphore_t
dispatch_group_t dg = (dispatch_group_t)_dispatch_object_alloc(
DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s));
_dispatch_semaphore_class_init(count, dg);
if (count) {
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://problem/22318411>
}
return dg;
};
}

group async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db) {
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;

_dispatch_continuation_init(dc, dq, db, 0, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc) = {
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc); //同dispatch_async
};
}

// 只是待处理事项数加1
void dispatch_group_enter(dispatch_group_t dg) {
long value = os_atomic_inc_orig2o(dg, dg_value, acquire);
if (slowpath((unsigned long)value >= (unsigned long)LONG_MAX)) {
DISPATCH_CLIENT_CRASH(value,
"Too many nested calls to dispatch_group_enter()");
}
if (value == 0) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
}


// 待处理事项数减1,唤醒group,并释放资源
void dispatch_group_leave(dispatch_group_t dg) {
long value = os_atomic_dec2o(dg, dg_value, release);
if (slowpath(value == 0)) {
return (void)_dispatch_group_wake(dg, true);
}
if (slowpath(value < 0)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_group_leave()");
}
}

wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout) {
if (dg->dg_value == 0) {
return 0;
}
if (timeout == 0) {
return _DSEMA4_TIMEOUT();
}
return _dispatch_group_wait_slow(dg, timeout) = {
long value;
int orig_waiters;

// check before we cause another signal to be sent by incrementing
// dg->dg_waiters
value = os_atomic_load2o(dg, dg_value, ordered); // 19296565
if (value == 0) {
return _dispatch_group_wake(dg, false) = {
dispatch_continuation_t next, head, tail = NULL;
long rval;

// cannot use os_mpsc_capture_snapshot() because we can have concurrent
// _dispatch_group_wake() calls
head = os_atomic_xchg2o(dg, dg_notify_head, NULL, relaxed);
if (head) {
// snapshot before anything is notified/woken <rdar://problem/8554546>
tail = os_atomic_xchg2o(dg, dg_notify_tail, NULL, release);
}

// 当有等待事项时,发送信号,并加到并行队列中
rval = (long)os_atomic_xchg2o(dg, dg_waiters, 0, relaxed);
if (rval) {
// wake group waiters
_dispatch_sema4_create(&dg->dg_sema, _DSEMA4_POLICY_FIFO);
_dispatch_sema4_signal(&dg->dg_sema, rval);
}
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (head) {
// async group notify blocks
do {
next = os_mpsc_pop_snapshot_head(head, tail, do_next);
dispatch_queue_t dsn_queue = (dispatch_queue_t)head->dc_data;
_dispatch_continuation_async(dsn_queue, head);
_dispatch_release(dsn_queue);
} while ((head = next));
refs++;
}
if (refs) _dispatch_release_n(dg, refs);
return 0;
};
}

/*
增加等待事项数
当timeout超时,唤醒group里所有事项并执行
否则等待信号唤醒
*/

(void)os_atomic_inc2o(dg, dg_waiters, relaxed);
// check the values again in case we need to wake any threads
value = os_atomic_load2o(dg, dg_value, ordered); // 19296565
if (value == 0) {
_dispatch_group_wake(dg, false);
// Fall through to consume the extra signal, forcing timeout to avoid
// useless setups as it won't block
timeout = DISPATCH_TIME_FOREVER;
}

_dispatch_sema4_create(&dg->dg_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dg->dg_sema, timeout)) {
break;
}
// Fall through and try to undo the earlier change to
// dg->dg_waiters
case DISPATCH_TIME_NOW:
orig_waiters = dg->dg_waiters;
while (orig_waiters) {
if (os_atomic_cmpxchgvw2o(dg, dg_waiters, orig_waiters,
orig_waiters - 1, &orig_waiters, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread is running _dispatch_group_wake()
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dg->dg_sema);
break;
}
return 0;
};
}

signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static inline void _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn) {
//当待处理的事项数0时,唤醒group
dsn->dc_data = dq;
dsn->do_next = NULL;
_dispatch_retain(dq);
if (os_mpsc_push_update_tail(dg, dg_notify, dsn, do_next)) {
_dispatch_retain(dg);
os_atomic_store2o(dg, dg_notify_head, dsn, ordered);
// seq_cst with atomic store to notify_head <rdar://problem/11750916>
if (os_atomic_load2o(dg, dg_value, ordered) == 0) {
_dispatch_group_wake(dg, false);
}
}
}

dispatch once

1
2
3
4
5
6
7
8
9
10
11
void dispatch_once(dispatch_once_t *val, dispatch_block_t block) {
dispatch_once_f(val, block, _dispatch_Block_invoke(block)) = {
// 首次会执行下面的dispatch_once_f,再次进入则不再处理
if (DISPATCH_EXPECT(*predicate, ~0l) != ~0l) {
dispatch_once_f(predicate, context, function);
} else {
dispatch_compiler_barrier();
}
DISPATCH_COMPILER_CAN_ASSUME(*predicate == ~0l);
};
}

dispatch source

常见的例子如下,使用 GCD Timer 的好处在于不依赖 runloop

1
2
3
4
5
6
dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);  
dispatch_source_set_timer(timer, dispatch_walltime(NULL, 0), 10*NSEC_PER_SEC, 1*NSEC_PER_SEC); //每10秒触发timer,误差1秒
dispatch_source_set_event_handler(timer, ^{
// 定时器触发时执行的 block
});
dispatch_resume(timer);

create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
dispatch_source_t dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
unsigned long mask, dispatch_queue_t dq) {
dispatch_source_refs_t dr;
dispatch_source_t ds;

dr = dux_create(dst, handle, mask)._dr;
if (unlikely(!dr)) {
return DISPATCH_BAD_INPUT;
}

ds = _dispatch_object_alloc(DISPATCH_VTABLE(source),
sizeof(struct dispatch_source_s));
// Initialize as a queue first, then override some settings below.
_dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1,
DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER);
ds->dq_label = "source";
ds->do_ref_cnt++; // 管理线程的引用数加1
ds->ds_refs = dr;
dr->du_owner_wref = _dispatch_ptr2wref(ds);

if (slowpath(!dq)) {
dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
} else {
_dispatch_retain((dispatch_queue_t _Nonnull)dq);
}
ds->do_targetq = dq;
if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) {
_dispatch_source_set_interval(ds, handle) = {
#define NSEC_PER_FRAME (NSEC_PER_SEC/60)
// approx 1 year (60s * 60m * 24h * 365d)
#define FOREVER_NSEC 31536000000000000ull

dispatch_timer_source_refs_t dr = ds->ds_timer_refs;
const bool animation = dr->du_fflags & DISPATCH_INTERVAL_UI_ANIMATION;
if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
FOREVER_NSEC/NSEC_PER_MSEC))) {
interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
} else {
interval = FOREVER_NSEC;
}
interval = _dispatch_time_nano2mach(interval);
uint64_t target = _dispatch_absolute_time() + interval;
target -= (target % interval);
const uint64_t leeway = animation ?
_dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
dr->dt_timer.target = target;
dr->dt_timer.deadline = target + leeway;
dr->dt_timer.interval = interval;
_dispatch_source_timer_telemetry(ds, DISPATCH_CLOCK_MACH, &dr->dt_timer) = {
if (_dispatch_trace_timer_configure_enabled() ||
_dispatch_source_timer_telemetry_enabled()) {
_dispatch_source_timer_telemetry_slow(ds, clock, values) = {
if (_dispatch_trace_timer_configure_enabled()) {
_dispatch_trace_timer_configure(ds, clock, values) = {
dispatch_timer_source_refs_t dr = ds->ds_timer_refs;
struct dispatch_trace_timer_params_s params;
DISPATCH_TIMER_CONFIGURE(ds, _dispatch_trace_timer_function(dr),
_dispatch_trace_timer_params(clock, values, 0, &params));
};
}
};
asm(""); // prevent tailcall
}
};
};
}
_dispatch_object_debug(ds, "%s", __func__);
return ds;
}

set timer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
void dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
uint64_t interval, uint64_t leeway) {
dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
dispatch_timer_config_t dtc;

if (unlikely(!dt->du_is_timer || (dt->du_fflags&DISPATCH_TIMER_INTERVAL))) {
DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source");
}
// 设置开始时间,时间间隔和结束时间
dtc = _dispatch_source_timer_config_create(start, interval, leeway) = {
dispatch_timer_config_t dtc;
dtc = _dispatch_calloc(1ul, sizeof(struct dispatch_timer_config_s));
if (unlikely(interval == 0)) {
if (start != DISPATCH_TIME_FOREVER) {
_dispatch_bug_deprecated("Setting timer interval to 0 requests "
"a 1ns timer, did you mean FOREVER (a one-shot timer)?");
}
interval = 1;
} else if ((int64_t)interval < 0) {
// 6866347 - make sure nanoseconds won't overflow
interval = INT64_MAX;
}
if ((int64_t)leeway < 0) {
leeway = INT64_MAX;
}
if (start == DISPATCH_TIME_NOW) {
start = _dispatch_absolute_time();
} else if (start == DISPATCH_TIME_FOREVER) {
start = INT64_MAX;
}

if ((int64_t)start < 0) {
// wall clock
start = (dispatch_time_t)-((int64_t)start);
dtc->dtc_clock = DISPATCH_CLOCK_WALL;
} else {
// absolute clock
interval = _dispatch_time_nano2mach(interval);
if (interval < 1) {
// rdar://problem/7287561 interval must be at least one in
// in order to avoid later division by zero when calculating
// the missed interval count. (NOTE: the wall clock's
// interval is already "fixed" to be 1 or more)
interval = 1;
}
leeway = _dispatch_time_nano2mach(leeway);
dtc->dtc_clock = DISPATCH_CLOCK_MACH;
}
if (interval < INT64_MAX && leeway > interval / 2) {
leeway = interval / 2;
}

dtc->dtc_timer.target = start;
dtc->dtc_timer.interval = interval;
if (start + leeway < INT64_MAX) {
dtc->dtc_timer.deadline = start + leeway;
} else {
dtc->dtc_timer.deadline = INT64_MAX;
}
return dtc;
};
_dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer);
dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release);
if (dtc) free(dtc);
dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
}

set event handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void dispatch_source_set_event_handler(dispatch_source_t ds,
dispatch_block_t handler) {
dispatch_continuation_t dc;

// 创建dispatch_continuation_t,并加入到并行队列
dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true) = {
// sources don't propagate priority by default
const dispatch_block_flags_t flags =
DISPATCH_BLOCK_HAS_PRIORITY | DISPATCH_BLOCK_NO_VOUCHER;
dispatch_continuation_t dc = _dispatch_continuation_alloc();
if (func) {
uintptr_t dc_flags = 0;

if (kind != DS_EVENT_HANDLER) {
dc_flags |= DISPATCH_OBJ_CONSUME_BIT;
}
if (block) {
#ifdef __BLOCKS__
_dispatch_continuation_init(dc, ds, func, 0, flags, dc_flags);
#endif /* __BLOCKS__ */
} else {
dc_flags |= DISPATCH_OBJ_CTXT_FETCH_BIT;
_dispatch_continuation_init_f(dc, ds, ds->do_ctxt, func,
0, flags, dc_flags);
}
_dispatch_trace_continuation_push(ds->_as_dq, dc);
} else {
dc->dc_flags = 0;
dc->dc_func = NULL;
}
return dc;
};
_dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
}

具体的细节没有找到,没有搞明白source是怎么实现的

-------------本文结束感谢您的阅读-------------