1
0
Fork 0

Rewrite semaphore handling for transfer workers.

This commit is contained in:
Dario 2024-10-15 18:30:55 -03:00
parent 04692d83cb
commit 7a936e8bac
2 changed files with 29 additions and 15 deletions

View File

@ -5052,7 +5052,6 @@ RenderingDevice::TransferWorker *RenderingDevice::_acquire_transfer_worker(uint3
// No existing worker was picked, we create a new one. // No existing worker was picked, we create a new one.
transfer_worker = memnew(TransferWorker); transfer_worker = memnew(TransferWorker);
transfer_worker->command_fence = driver->fence_create(); transfer_worker->command_fence = driver->fence_create();
transfer_worker->command_semaphore = driver->semaphore_create();
transfer_worker->command_pool = driver->command_pool_create(transfer_queue_family, RDD::COMMAND_BUFFER_TYPE_PRIMARY); transfer_worker->command_pool = driver->command_pool_create(transfer_queue_family, RDD::COMMAND_BUFFER_TYPE_PRIMARY);
transfer_worker->command_buffer = driver->command_buffer_create(transfer_worker->command_pool); transfer_worker->command_buffer = driver->command_buffer_create(transfer_worker->command_pool);
transfer_worker->index = transfer_worker_pool.size(); transfer_worker->index = transfer_worker_pool.size();
@ -5075,7 +5074,7 @@ RenderingDevice::TransferWorker *RenderingDevice::_acquire_transfer_worker(uint3
// If there's not enough bytes to use on the staging buffer, we submit everything pending from the worker and wait for the work to be finished. // If there's not enough bytes to use on the staging buffer, we submit everything pending from the worker and wait for the work to be finished.
if (transfer_worker->recording) { if (transfer_worker->recording) {
_end_transfer_worker(transfer_worker); _end_transfer_worker(transfer_worker);
_submit_transfer_worker(transfer_worker, false); _submit_transfer_worker(transfer_worker);
} }
if (transfer_worker->submitted) { if (transfer_worker->submitted) {
@ -5128,12 +5127,12 @@ void RenderingDevice::_end_transfer_worker(TransferWorker *p_transfer_worker) {
p_transfer_worker->recording = false; p_transfer_worker->recording = false;
} }
void RenderingDevice::_submit_transfer_worker(TransferWorker *p_transfer_worker, bool p_signal_semaphore) { void RenderingDevice::_submit_transfer_worker(TransferWorker *p_transfer_worker, VectorView<RDD::SemaphoreID> p_signal_semaphores) {
const VectorView<RDD::SemaphoreID> execute_semaphore = p_signal_semaphore ? p_transfer_worker->command_semaphore : VectorView<RDD::SemaphoreID>(); driver->command_queue_execute_and_present(transfer_queue, {}, p_transfer_worker->command_buffer, p_signal_semaphores, p_transfer_worker->command_fence, {});
driver->command_queue_execute_and_present(transfer_queue, {}, p_transfer_worker->command_buffer, execute_semaphore, p_transfer_worker->command_fence, {});
if (p_signal_semaphore) { for (uint32_t i = 0; i < p_signal_semaphores.size(); i++) {
// Indicate the frame should wait on these semaphores before executing the main command buffer. // Indicate the frame should wait on these semaphores before executing the main command buffer.
frames[frame].semaphores_to_wait_on.push_back(p_transfer_worker->command_semaphore); frames[frame].semaphores_to_wait_on.push_back(p_signal_semaphores[i]);
} }
p_transfer_worker->submitted = true; p_transfer_worker->submitted = true;
@ -5196,7 +5195,8 @@ void RenderingDevice::_check_transfer_worker_index_array(IndexArray *p_index_arr
void RenderingDevice::_submit_transfer_workers(bool p_operations_used_by_draw) { void RenderingDevice::_submit_transfer_workers(bool p_operations_used_by_draw) {
MutexLock transfer_worker_lock(transfer_worker_pool_mutex); MutexLock transfer_worker_lock(transfer_worker_pool_mutex);
for (TransferWorker *worker : transfer_worker_pool) { for (uint32_t i = 0; i < transfer_worker_pool.size(); i++) {
TransferWorker *worker = transfer_worker_pool[i];
if (p_operations_used_by_draw) { if (p_operations_used_by_draw) {
MutexLock lock(worker->operations_mutex); MutexLock lock(worker->operations_mutex);
if (worker->operations_processed >= transfer_worker_operation_used_by_draw[worker->index]) { if (worker->operations_processed >= transfer_worker_operation_used_by_draw[worker->index]) {
@ -5208,8 +5208,9 @@ void RenderingDevice::_submit_transfer_workers(bool p_operations_used_by_draw) {
{ {
MutexLock lock(worker->thread_mutex); MutexLock lock(worker->thread_mutex);
if (worker->recording) { if (worker->recording) {
VectorView<RDD::SemaphoreID> semaphores = p_operations_used_by_draw ? frames[frame].transfer_worker_semaphores[i] : VectorView<RDD::SemaphoreID>();
_end_transfer_worker(worker); _end_transfer_worker(worker);
_submit_transfer_worker(worker, true); _submit_transfer_worker(worker, semaphores);
} }
} }
} }
@ -5228,7 +5229,6 @@ void RenderingDevice::_wait_for_transfer_workers() {
void RenderingDevice::_free_transfer_workers() { void RenderingDevice::_free_transfer_workers() {
MutexLock transfer_worker_lock(transfer_worker_pool_mutex); MutexLock transfer_worker_lock(transfer_worker_pool_mutex);
for (TransferWorker *worker : transfer_worker_pool) { for (TransferWorker *worker : transfer_worker_pool) {
driver->semaphore_free(worker->command_semaphore);
driver->fence_free(worker->command_fence); driver->fence_free(worker->command_fence);
driver->buffer_free(worker->staging_buffer); driver->buffer_free(worker->staging_buffer);
driver->command_pool_free(worker->command_pool); driver->command_pool_free(worker->command_pool);
@ -6014,6 +6014,9 @@ Error RenderingDevice::initialize(RenderingContextDriver *p_context, DisplayServ
present_queue_family = main_queue_family; present_queue_family = main_queue_family;
} }
// Use the processor count as the max amount of transfer workers that can be created.
transfer_worker_pool_max_size = OS::get_singleton()->get_processor_count();
// Create data for all the frames. // Create data for all the frames.
for (uint32_t i = 0; i < frames.size(); i++) { for (uint32_t i = 0; i < frames.size(); i++) {
frames[i].index = 0; frames[i].index = 0;
@ -6041,6 +6044,13 @@ Error RenderingDevice::initialize(RenderingContextDriver *p_context, DisplayServ
// Assign the main queue family and command pool to the command buffer pool. // Assign the main queue family and command pool to the command buffer pool.
frames[i].command_buffer_pool.pool = frames[i].command_pool; frames[i].command_buffer_pool.pool = frames[i].command_pool;
// Create the semaphores for the transfer workers.
frames[i].transfer_worker_semaphores.resize(transfer_worker_pool_max_size);
for (uint32_t j = 0; j < transfer_worker_pool_max_size; j++) {
frames[i].transfer_worker_semaphores[j] = driver->semaphore_create();
ERR_FAIL_COND_V(!frames[i].transfer_worker_semaphores[j], FAILED);
}
} }
// Start from frame count, so everything else is immediately old. // Start from frame count, so everything else is immediately old.
@ -6087,9 +6097,6 @@ Error RenderingDevice::initialize(RenderingContextDriver *p_context, DisplayServ
ERR_FAIL_COND_V(err, FAILED); ERR_FAIL_COND_V(err, FAILED);
} }
// TODO: How should this max size be determined?
transfer_worker_pool_max_size = OS::get_singleton()->get_processor_count();
draw_list = nullptr; draw_list = nullptr;
compute_list = nullptr; compute_list = nullptr;
@ -6452,6 +6459,10 @@ void RenderingDevice::finalize() {
for (uint32_t j = 0; j < buffer_pool.buffers.size(); j++) { for (uint32_t j = 0; j < buffer_pool.buffers.size(); j++) {
driver->semaphore_free(buffer_pool.semaphores[j]); driver->semaphore_free(buffer_pool.semaphores[j]);
} }
for (uint32_t j = 0; j < frames[i].transfer_worker_semaphores.size(); j++) {
driver->semaphore_free(frames[i].transfer_worker_semaphores[j]);
}
} }
if (pipeline_cache_enabled) { if (pipeline_cache_enabled) {

View File

@ -1267,7 +1267,6 @@ private:
RDD::CommandBufferID command_buffer; RDD::CommandBufferID command_buffer;
RDD::CommandPoolID command_pool; RDD::CommandPoolID command_pool;
RDD::FenceID command_fence; RDD::FenceID command_fence;
RDD::SemaphoreID command_semaphore;
bool recording = false; bool recording = false;
bool submitted = false; bool submitted = false;
BinaryMutex thread_mutex; BinaryMutex thread_mutex;
@ -1287,7 +1286,7 @@ private:
TransferWorker *_acquire_transfer_worker(uint32_t p_transfer_size, uint32_t p_required_align, uint32_t &r_staging_offset); TransferWorker *_acquire_transfer_worker(uint32_t p_transfer_size, uint32_t p_required_align, uint32_t &r_staging_offset);
void _release_transfer_worker(TransferWorker *p_transfer_worker); void _release_transfer_worker(TransferWorker *p_transfer_worker);
void _end_transfer_worker(TransferWorker *p_transfer_worker); void _end_transfer_worker(TransferWorker *p_transfer_worker);
void _submit_transfer_worker(TransferWorker *p_transfer_worker, bool p_signal_semaphore); void _submit_transfer_worker(TransferWorker *p_transfer_worker, VectorView<RDD::SemaphoreID> p_signal_semaphores = VectorView<RDD::SemaphoreID>());
void _wait_for_transfer_worker(TransferWorker *p_transfer_worker); void _wait_for_transfer_worker(TransferWorker *p_transfer_worker);
void _check_transfer_worker_operation(uint32_t p_transfer_worker_index, uint64_t p_transfer_worker_operation); void _check_transfer_worker_operation(uint32_t p_transfer_worker_index, uint64_t p_transfer_worker_operation);
void _check_transfer_worker_buffer(Buffer *p_buffer); void _check_transfer_worker_buffer(Buffer *p_buffer);
@ -1372,6 +1371,10 @@ private:
// Swap chains prepared for drawing during the frame that must be presented. // Swap chains prepared for drawing during the frame that must be presented.
LocalVector<RDD::SwapChainID> swap_chains_to_present; LocalVector<RDD::SwapChainID> swap_chains_to_present;
// Semaphores the transfer workers can use to wait before rendering the frame.
// This must have the same size of the transfer worker pool.
TightLocalVector<RDD::SemaphoreID> transfer_worker_semaphores;
// Extra command buffer pool used for driver workarounds. // Extra command buffer pool used for driver workarounds.
RDG::CommandBufferPool command_buffer_pool; RDG::CommandBufferPool command_buffer_pool;