Architecting systems with modern C++

Work Contracts: Usage Guide
August 27th, 2023



This is part two of the Work Contract series. Part one can be found here. Note: this page is a work in progress. More examples will be added soon.

BYOT:

Before looking at examples of how to use work contracts it is important to mention that the basic implementation of work contracts does not, on its own, provide the threading for the user. This is by design. A lot of developers would assume that any task management system would automatically provide the threading for them. However, it is a mistake to convolute threads and the work that those threads perform. By decoupling the two the developer has the ability to determine which threads will do what work, which cores the threads shall run on, how many threads there will be in total, have control over which signals shall interrupt those threads, top level exception handling and, very importantly, when and how those threads shall be started and stopped.

There are many thread pool implementations. It is my opinion that a thread pool should be just that - a pool of threads. It should have the ability to stop and start those threads, determine which cores each thread should be assigned to, provide top level exception handling etc, and nothing more. What work the threads do is tangential to the thread pool itself.

For the remainder of this article I shall use my own thread pool implementation which can be found here.

Work Contract basics: groups, contract creation, scheduling and release

A work_contract represents a repeatable task which can be executed an arbitrary number of times. The execution of the task is performed asynchronously and is guaranteed to be thread safe. work_contracts are associated with a parent work_contract_group and are created using work_contract_group::create_contract().

To exercise a work_contract it must first be "scheduled". Scheduling a work contract is done using work_contract::schedule(). This flags the work_contract as ready to be executed. Executing an scheduled work_contract is achieved by worker threads which call work_contract_group::execute_next_contract() on the work_contract_group which created that work_contract. This worker thread is then responsible for executing the task associated with that work_contract. Once the task has been executed the work_contract returns to the non-scheduled state.

The following demonstrates the creation of a work_contract_group, a work_contract, scheduling the work_contract and executing that work_contract.


  int main()
  {
    std::atomic<bool> done{false};

    // create a non blocking work contract group
    bcpp::system::work_contract_group workContractGroup(256);
    
    // create a contract
    auto workContract = std::move(workContractGroup.create_contract([&](){std::cout << "executed\n"; done = true;}));
    
    // create async thread to process the work contract
    std::jthread workerThread([&](){while (!done) workContractGroup.execute_next_contract();});
    
    // delay for one second before scheduling the work contract
    std::this_thread::sleep_for(std::chrono::seconds(1));
    
    // schedule and it should be executed very soon by waiting worker thread
    workContract.schedule();
    
    while (!done);
    return 0;
  }
  

A work_contract can also optionally define a one-shot release task. As with the primary task, a work_contract release task is performed asynchronously and is guaranteed to be thread safe. Release tasks are also executed by worker threads which call work_contract_group::execute_next_contract().

Furthermore, the release task is guaranteed to be the final task for the work_contract. Once the release work task is executed, neither the release task, nor the primary work task shall ever be executed again. It is accurate to think of the release task as the "destructor" for the work_contract. The release task can be explicitly scheduled via work_contract::release() or, optionally, when the work_contract is destroyed.


  std::atomic<bool> done{false};
  std::atomic<bool> released{false};
  
  void work_function(){std::cout << "work executed\n"; done = true;}
  void release_function(){std::cout << "contract released\n"; released = true;}
  int main()
  {
      bcpp::system::work_contract_group workContractGroup(256);
      auto workContract = workContractGroup.create_contract(work_function, release_function);
      std::jthread workerThread([&](){while (!released) workContractGroup.execute_next_contract();});
      
      workContract.schedule();
      while (!done);
      workContract.release();
      while (!released);
      return 0;
  }
  

Work Contracts can be executed multiple times:

Work contracts are not "tasks" in the traditional sense. Work contracts can be executed multiple times. The following demonstrates this:


  int main()
  {
    std::atomic<bool> done{false};

    // create a non blocking work contract group
    bcpp::system::work_contract_group workContractGroup(256);
    // create a contract
    bcpp::system::work_contract workContract = std::move(workContractGroup.create_contract(
            [&, counter = 0]
            (
            ) mutable
            {
                static auto constexpr max_counter = 5;
                std::cout << "executed " << ++counter << " times\n"; 
                if (counter >= max_counter) 
                    workContract.release();
                else
                    workContract.schedule();
            }));
    workContract.schedule();
    
    // create async thread to process the work contract
    std::jthread workerThread([&](){while (workContract) workContractGroup.execute_next_contract();});

    while (workContract)
        ; // stick arount until the work contract is released

    return 0;
  }
  

Scheduling an already scheduled work contract does nothing:

A work_contract is in one of two states. It is either scheduled or it is not. Once a work_contract is executed this state transitions from scheduled to not scheduled. Repeated schedulings of a currently scheduled work_contract does nothing to further mutate its state as it is already in the scheduled state. The following example schedules the work_contract multiple times prior to executing the work_contract. The result is a single execution of the work function.


  bcpp::system::work_contract workContract;
  std::atomic<int> counter{5};
  std::atomic<bool> released{false};
  
  
  void work_function()
  {
      if (counter > 0) 
          std::cout << "work executed. counter = " << --counter << "\n";
      else
          workContract.release();
     workContract.schedule();
  }
  
  void release_function(){std::cout << "contract released\n"; released = true;}
  
  int main()
  {
      bcpp::system::work_contract_group workContractGroup(256);
      workContract = std::move(workContractGroup.create_contract(work_function, release_function));
      
      for (auto i = 0; i < 100; ++i)
            workContract.schedule(); // repeated schedulings does nothing
            
      std::jthread workerThread([&](){while (!released) workContractGroup.execute_next_contract();});
  
      while (!released);
      return 0;
  }
  

Work contacts can be either blocking or non-blocking:

There are two forms of work contract groups - blocking and non-blocking. The non-blocking form is slightly optimized for performance as it not required to maintain a count of scheduled work contracts. The blocking form is almost as peformant as the non-blocking for, however, work contracts created by a non blocking work contract group supports blocking calls to blocking_work_contract_group::execute_next_contract(timeout)

The following demonstrates the usage of blocking_work_contract_group


    int main()
    {
        std::atomic<bool> done{false};

        // create a blocking work contract group
        bcpp::system::blocking_work_contract_group workContractGroup(256);
        
        // create a contract
        auto workContract = std::move(workContractGroup.create_contract([&](){std::cout << "executed\n"; done = true;}));
        
        // create async thread to process the work contract within the next 3 seconds
        std::jthread workerThread([&](){workContractGroup.execute_next_contract(std::chrono::seconds(3));});
        
        // delay for one second before scheduling the work contract
        std::this_thread::sleep_for(std::chrono::seconds(1));
        
        // schedule and it should be executed very soon by waiting worker thread
        workContract.schedule();
        
        while (!done)
            ;
        return 0;
    }