
书接上文,要引出JobQueue,就要看JobBatchDispatcher::ScheduleJobForEachInternal这个重要函数,下面就来分析一下它的调用过程:
void __fastcall JobBatchDispatcher::ScheduleJobForEachInternal(
JobBatchDispatcher *this,
struct JobFence *a2,
void (__stdcall *a3)(void *, unsigned int),
void *a4,
int a5,
void (__stdcall *a6)(void *),
const struct JobFence *a7)
{
...
struct JobQueue *JobQueue; // [rsp+40h] [rbp-68h]
struct JobGroup *v11; // [rsp+48h] [rbp-60h]
...
JobQueue = GetJobQueue();
...
v11 = (struct JobGroup *)JobQueue::CreateForEachJobBatch(JobQueue, a3, a4, (unsigned int)a5, a6, v14, v16);
JobBatchDispatcher::HandleJobKickInternal(this, JobQueue, a2, v11, a5);
}
整个调用过程如下,可以看出JobSystem主要是依赖于JobQueue才能运行的.
JobBatchDispatcher::HandleJobKickInternal
JobBatchDispatcher::KickJobs
JobQueue::ScheduleGroups
JobQueue::ScheduleGroupInternal
JobQueue::TryExecJobGroup
JobQueue::ExecJobGroup
JobQueue::ExecAll
JobQueue::Exec
我们下面再来看JobSystem初始化的过程.
JobSystem初始化char InitializeEngineNoGraphics(void)
{
...
JobSystem::CreateJobSystem(v3);
...
return 1;
}
JobSystem::CreateJobSystem
获得处理器数量systeminfo::GetProcessorCount(v3)准备创建线程,
void __fastcall JobSystem::CreateJobSystem(JobSystem *this)
{
...
systeminfo *v3; // rcx
...
JobSystem *v6; // rcx
...
struct profiling::Marker *v9; // [rsp+48h] [rbp-F0h]
...
v1 = &v7;
for ( i = 76i64; i; --i )
{
*(_DWORD *)v1 = -858993460;
v1 = (__int64 *)((char *)v1 + 4);
}
v18 = -2i64;
v9 = (struct profiling::Marker *)&unk_18BE65FF0;
profiler_begin((struct profiling::Marker *)&unk_18BE65FF0);
v10 = -1;
if ( (unsigned int)BootConfig::ParameterData::operator[](&unk_18BE66020, 0i64) != -1 )
{
v11 = 0;
v12 = systeminfo::GetProcessorCount(v3) - 1;
if ( (int)BootConfig::ParameterData::operator[](&unk_18BE66020, 0i64) < 0
|| (v4 = BootConfig::ParameterData::operator[](&unk_18BE66020, 0i64), v4 > v12) )
{
v16 = BootConfig::ParameterData::operator[](&unk_18BE66020, 0i64);
LODWORd(v8) = v12;
v21 = Format(v14, "JobSystem: Invalid job-worker-count value %d must be between %d->%dn", v16, 0i64, v8);
v22 = v21;
DebugStringToFile>>(
v21,
(unsigned int)"C:\buildslave\unity\build\Runtime/Jobs/JobSystem.cpp",
39,
-1,
4,
0,
0,
0i64);
core::StringStorageDefault::deallocate(v14);
v5 = BootConfig::ParameterData::operator[](&unk_18BE66020, 0i64);
if ( v5 > v12 )
{
v10 = v12;
v17 = BootConfig::ParameterData::operator[](&unk_18BE66020, 0i64);
v23 = Format(v15, "JobSystem: Clamping job-worker-count value %d to %dn", v17, (unsigned int)v12);
v24 = v23;
DebugStringToFile>>(
v23,
(unsigned int)"C:\buildslave\unity\build\Runtime/Jobs/JobSystem.cpp",
43,
-1,
4,
0,
0,
0i64);
core::StringStorageDefault::deallocate(v15);
}
}
else
{
v10 = BootConfig::ParameterData::operator[](&unk_18BE66020, 0i64);
v19 = Format(v13, "JobSystem: Creating JobQueue using job-worker-count value %dn", (unsigned int)v10);
v20 = v19;
DebugStringToFile>>(
v19,
(unsigned int)"C:\buildslave\unity\build\Runtime/Jobs/JobSystem.cpp",
35,
-1,
4,
0,
0,
0i64);
core::StringStorageDefault::deallocate(v13);
}
}
dword_18BE66060 = v10 != -1;
CreateJobQueue("Job", "Worker", v10, 1);//创建JobQueue
CreateBackgroundJobQueue();
InitializeBatchedJobs();
dword_18BA6409C = JobSystem::GetJobQueueMaximumThreadCount(v6);
profiler_end(v9);
}
构造JobQueue结构
如果传入的线程数量为-1,则会提取ThreadConfig::GetJobSchedulerMaxThreads()作为线程数量
void __fastcall CreateJobQueue(const char *a1, const char *a2, int a3, char a4)
{
...
int JobSchedulerMaxThreads; // [rsp+88h] [rbp-20h]
...
...
JobQueue::g_JobGroupPool = CreateAtomicStack();
JobQueue::g_JobInfoPool = CreateAtomicStack();
if ( qword_18BE66C78
&& AssertImplementation(
0,
"C:\buildslave\unity\build\Runtime/Jobs/Internal/JobQueue.cpp",
1886,
-1,
"Assertion failed on expression: 'g_Queue == NULL'")
&& (unsigned __int8)Baselib_Debug_IsDebuggerAttached() )
{
__debugbreak();
}
v7 = 2;
if ( a3 == -1 )
JobSchedulerMaxThreads = ThreadConfig::GetJobSchedulerMaxThreads();
else
JobSchedulerMaxThreads = a3;
v8 = JobSchedulerMaxThreads;
if ( JobSchedulerMaxThreads < 0 )
v8 = 0;
if ( v8 > 128 )
v8 = 128;
if ( a4 )
{
v9 = 112;
if ( v8 > 112 )
v8 = 112;
}
v10 = 3;
if ( a4 )
v10 |= 4u;
v11 = operator new(0x188ui64);
if ( v11 )
v14 = JobQueue::JobQueue(v11, (unsigned int)v8, 0x40000i64, v7, v10, a1, a2);
else
v14 = 0i64;
qword_18BE66C78 = v14;
}
JobQueue::JobQueue构造函数
a2参数是线程数量,可以发现线程最多不会超过128个.
__int64 __fastcall JobQueue::JobQueue(__int64 a1, unsigned int a2, __int64 a3, int a4, char a5, __int64 a6, __int64 a7)
{
...
AtomicList *Group; // rax
...
struct profiling::Marker *v13; // [rsp+38h] [rbp-220h]
...
Thread *v42; // [rsp+1F0h] [rbp-68h]
_QWORD *WorkerThreadName; // [rsp+1F8h] [rbp-60h]
...
v7 = &v11;
for ( i = 146i64; i; --i )
{
*(_DWORD *)v7 = -858993460;
v7 = (__int64 *)((char *)v7 + 4);
}
v34 = -2i64;
v23 = a1 + 16;
*(_QWORD *)(a1 + 16) = 0i64;
*(_DWORD *)(v23 + 8) = 0;
v27 = (_QWORD *)(a1 + 32);
qmemcpy(v24, &kMemDynamicArray, 0xCui64);
*(_QWORD *)(a1 + 32) = 0i64;
qmemcpy(v25, (const void *)SetCurrentMemoryOwner(v26, v24), sizeof(v25));
qmemcpy(v27 + 1, v25, 0xCui64);
v27[3] = 0i64;
v27[4] = 1i64;
v29 = a1 + 72;
v28 = a1 + 72;
Baselib_CappedSemaphore_Create_1(a1 + 72, (unsigned __int16)a2);
v30 = a1 + 200;
Baselib_CappedSemaphore_Create_1(a1 + 200, (unsigned __int16)a2);
*(_DWORD *)(a1 + 328) = 0;
*(_DWORD *)(a1 + 344) = 0;
*(_QWORD *)(a1 + 368) = a6;
*(_QWORD *)(a1 + 376) = a7;
v13 = (struct profiling::Marker *)&unk_18BE663B0;
profiler_begin((struct profiling::Marker *)&unk_18BE663B0);
v14 = 0i64;
v31 = a1 + 336;
*(_QWORD *)(a1 + 336) = 0i64;
*(_QWORD *)a1 = CreateAtomicStack();
qmemcpy(v21, &kMemThread, 0xCui64);
*(_QWORD *)(a1 + 8) = CreateAtomicQueue(v21);
v17[0] = 0i64;
LODWORd(v17[1]) = 0;
qmemcpy(v22, v17, sizeof(v22));
Group = JobQueue::CreateGroup(a1, 0, v22);
qmemcpy((void *)(a1 + 16), JobQueue::GetJobGroupID(a1, (__int64)v18, (__int64)Group), 0x10ui64);
*(_DWORD *)(a1 + 24) = AtomicList::Tag(*(AtomicList **)(a1 + 16));
v35 = (a5 & 2) != 0;
*(_BYTE *)(a1 + 384) = v35;
v36 = (a5 & 4) != 0;
*(_BYTE *)(a1 + 385) = v36;
*(_DWORD *)(a1 + 360) = -1;
if ( (a5 & 1) != 0 && a2 )
{
v37 = a2;
v19 = operator new[](saturated_mul(a2, 0x40ui64));
*(_QWORD *)(a1 + 352) = v19;
memset_0(*(void **)(a1 + 352), -1, (unsigned __int64)a2 << 6);
}
else
{
*(_QWORD *)(a1 + 352) = 0i64;
}
v38 = a1 + 32;
dynamic_array::resize_initialized(a1 + 32, a2, 1i64);
JobQueue::SetActiveThreadCountTargetImpl((JobQueue *)a1, a2);
for ( j = 0; j < a2; ++j )
{
v16 = -1;
if ( a4 >= 0 )
{
v39 = 1;
v16 = 1 << (j + a4);
}
v40 = a1 + 32;
*(_QWORD *)(dynamic_array::operator[](a1 + 32, (int)j) + 96) = a1;
v41 = a1 + 32;
v42 = (Thread *)dynamic_array::operator[](a1 + 32, (int)j);
WorkerThreadName = (_QWORD *)GetWorkerThreadName(v20, a6, a7, j);
v32 = WorkerThreadName;
if ( *WorkerThreadName )
v44 = (char *)*v32;
else
v44 = (char *)(v32 + 1);
v45 = v44;
Thread::SetName(v42, v44);
core::StringStorageDefault::deallocate(v20);
v46 = a1 + 32;
v33 = dynamic_array::operator[](a1 + 32, (int)j);
*(_QWORD *)(v33 + 40) = a3;
v47 = a1 + 32;
v48 = (Thread *)dynamic_array::operator[](a1 + 32, (int)j);
v49 = a1 + 32;
v50 = (void *)dynamic_array::operator[](a1 + 32, (int)j);
v12 = v16;
Thread::Run(v48, (void *(__stdcall *)(void *))JobQueue::WorkLoop, v50, 0);
}
profiler_end(v13);
return a1;
}
调用系统API
Thread::Run PlatformThread::Creat __imp_CreateThread//windows API
至此JobQueue完成了线程的创建.
总结JobSystem采用的是类似线程池的方式管理线程,线程数量是一开始就定下的,再使用JobSystem的时候不存在创建线程的消耗.