This is part two of the Work Contract series. Part one can be found here. Note: this page is a work in progress. More examples will be added soon.
BYOT:
Before looking at examples of how to use work contracts it is important to mention that the basic implementation of work contracts does not, on its own, provide the threading for the user. This is by design. A lot of developers would assume that any task management system would automatically provide the threading for them. However, it is a mistake to convolute threads and the work that those threads perform. By decoupling the two the developer has the ability to determine which threads will do what work, which cores the threads shall run on, how many threads there will be in total, have control over which signals shall interrupt those threads, top level exception handling and, very importantly, when and how those threads shall be started and stopped.There are many thread pool implementations. It is my opinion that a thread pool should be just that - a pool of threads. It should have the ability to stop and start those threads, determine which cores each thread should be assigned to, provide top level exception handling etc, and nothing more. What work the threads do is tangential to the thread pool itself.
For the remainder of this article I shall use my own thread pool implementation which can be found here.
Work Contract basics: groups, contract creation, scheduling and release
A work_contract represents a repeatable task which can be executed an arbitrary number of times. The execution of the task is performed asynchronously and is guaranteed to be thread safe.
work_contracts
are associated with a parent work_contract_group
and are created using work_contract_group::create_contract()
.
To exercise a work_contract
it must first be "scheduled". Scheduling a work contract is done using work_contract::schedule()
. This flags the work_contract
as ready to be executed.
Executing an scheduled work_contract
is achieved by worker threads which call work_contract_group::execute_next_contract()
on the work_contract_group
which created that work_contract
.
This worker thread is then responsible for executing the task associated with that work_contract
. Once the task has been executed the work_contract
returns to the non-scheduled state.
The following demonstrates the creation of a work_contract_group
, a work_contract
, scheduling the work_contract
and executing that work_contract
.
int main()
{
std::atomic<bool> done{false};
// create a non blocking work contract group
bcpp::system::work_contract_group workContractGroup(256);
// create a contract
auto workContract = std::move(workContractGroup.create_contract([&](){std::cout << "executed\n"; done = true;}));
// create async thread to process the work contract
std::jthread workerThread([&](){while (!done) workContractGroup.execute_next_contract();});
// delay for one second before scheduling the work contract
std::this_thread::sleep_for(std::chrono::seconds(1));
// schedule and it should be executed very soon by waiting worker thread
workContract.schedule();
while (!done);
return 0;
}
A work_contract
can also optionally define a one-shot release task. As with the primary task, a work_contract
release task is performed asynchronously and is guaranteed to be thread safe. Release tasks are also executed by worker threads which call work_contract_group::execute_next_contract()
.
Furthermore, the release task is guaranteed to be the final task for the work_contract
. Once the release work task is executed, neither the release task, nor the primary work task shall ever be executed again. It is accurate to think of the release task as the "destructor" for the work_contract
. The release task can be explicitly scheduled via work_contract::release()
or, optionally, when the work_contract is destroyed.
std::atomic<bool> done{false};
std::atomic<bool> released{false};
void work_function(){std::cout << "work executed\n"; done = true;}
void release_function(){std::cout << "contract released\n"; released = true;}
int main()
{
bcpp::system::work_contract_group workContractGroup(256);
auto workContract = workContractGroup.create_contract(work_function, release_function);
std::jthread workerThread([&](){while (!released) workContractGroup.execute_next_contract();});
workContract.schedule();
while (!done);
workContract.release();
while (!released);
return 0;
}
Work Contracts can be executed multiple times:
Work contracts are not "tasks" in the traditional sense. Work contracts can be executed multiple times. The following demonstrates this:
int main()
{
std::atomic<bool> done{false};
// create a non blocking work contract group
bcpp::system::work_contract_group workContractGroup(256);
// create a contract
bcpp::system::work_contract workContract = std::move(workContractGroup.create_contract(
[&, counter = 0]
(
) mutable
{
static auto constexpr max_counter = 5;
std::cout << "executed " << ++counter << " times\n";
if (counter >= max_counter)
workContract.release();
else
workContract.schedule();
}));
workContract.schedule();
// create async thread to process the work contract
std::jthread workerThread([&](){while (workContract) workContractGroup.execute_next_contract();});
while (workContract)
; // stick arount until the work contract is released
return 0;
}
Scheduling an already scheduled work contract does nothing:
A work_contract
is in one of two states. It is either scheduled or it is not. Once a work_contract
is executed this state transitions from scheduled to not scheduled.
Repeated schedulings of a currently scheduled work_contract
does nothing to further mutate its state as it is already in the scheduled state.
The following example schedules the work_contract
multiple times prior to executing the work_contract
. The result is a single execution of the work function.
bcpp::system::work_contract workContract;
std::atomic<int> counter{5};
std::atomic<bool> released{false};
void work_function()
{
if (counter > 0)
std::cout << "work executed. counter = " << --counter << "\n";
else
workContract.release();
workContract.schedule();
}
void release_function(){std::cout << "contract released\n"; released = true;}
int main()
{
bcpp::system::work_contract_group workContractGroup(256);
workContract = std::move(workContractGroup.create_contract(work_function, release_function));
for (auto i = 0; i < 100; ++i)
workContract.schedule(); // repeated schedulings does nothing
std::jthread workerThread([&](){while (!released) workContractGroup.execute_next_contract();});
while (!released);
return 0;
}
Work contacts can be either blocking or non-blocking:
There are two forms of work contract groups - blocking and non-blocking. The non-blocking form is slightly optimized for performance as it
not required to maintain a count of scheduled work contracts. The blocking form is almost as peformant as the non-blocking for, however,
work contracts created by a non blocking work contract group supports blocking calls to blocking_work_contract_group::execute_next_contract(timeout)
The following demonstrates the usage of blocking_work_contract_group
int main()
{
std::atomic<bool> done{false};
// create a blocking work contract group
bcpp::system::blocking_work_contract_group workContractGroup(256);
// create a contract
auto workContract = std::move(workContractGroup.create_contract([&](){std::cout << "executed\n"; done = true;}));
// create async thread to process the work contract within the next 3 seconds
std::jthread workerThread([&](){workContractGroup.execute_next_contract(std::chrono::seconds(3));});
// delay for one second before scheduling the work contract
std::this_thread::sleep_for(std::chrono::seconds(1));
// schedule and it should be executed very soon by waiting worker thread
workContract.schedule();
while (!done)
;
return 0;
}