OR-Tools  8.1
threadpool.cc
Go to the documentation of this file.
1 // Copyright 2010-2018 Google LLC
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
15 
16 #include "ortools/base/logging.h"
17 
18 namespace operations_research {
19 void RunWorker(void* data) {
20  ThreadPool* const thread_pool = reinterpret_cast<ThreadPool*>(data);
21  std::function<void()> work = thread_pool->GetNextTask();
22  while (work != NULL) {
23  work();
24  work = thread_pool->GetNextTask();
25  }
26 }
27 
28 ThreadPool::ThreadPool(const std::string& prefix, int num_workers)
29  : num_workers_(num_workers) {}
30 
32  if (started_) {
33  std::unique_lock<std::mutex> mutex_lock(mutex_);
34  waiting_to_finish_ = true;
35  mutex_lock.unlock();
36  condition_.notify_all();
37  for (int i = 0; i < num_workers_; ++i) {
38  all_workers_[i].join();
39  }
40  }
41 }
42 
44  CHECK_GT(capacity, num_workers_);
45  CHECK(!started_);
46  queue_capacity_ = capacity;
47 }
48 
50  started_ = true;
51  for (int i = 0; i < num_workers_; ++i) {
52  all_workers_.push_back(std::thread(&RunWorker, this));
53  }
54 }
55 
56 std::function<void()> ThreadPool::GetNextTask() {
57  std::unique_lock<std::mutex> lock(mutex_);
58  for (;;) {
59  if (!tasks_.empty()) {
60  std::function<void()> task = tasks_.front();
61  tasks_.pop_front();
62  if (tasks_.size() < queue_capacity_ && waiting_for_capacity_) {
63  waiting_for_capacity_ = false;
64  capacity_condition_.notify_all();
65  }
66  return task;
67  }
68  if (waiting_to_finish_) {
69  return nullptr;
70  } else {
71  condition_.wait(lock);
72  }
73  }
74  return nullptr;
75 }
76 
77 void ThreadPool::Schedule(std::function<void()> closure) {
78  std::unique_lock<std::mutex> lock(mutex_);
79  while (tasks_.size() >= queue_capacity_) {
80  waiting_for_capacity_ = true;
81  capacity_condition_.wait(lock);
82  }
83  tasks_.push_back(closure);
84  if (started_) {
85  lock.unlock();
86  condition_.notify_all();
87  }
88 }
89 
90 } // namespace operations_research
threadpool.h
logging.h
CHECK_GT
#define CHECK_GT(val1, val2)
Definition: base/logging.h:702
operations_research
The vehicle routing library lets one model and solve generic vehicle routing problems ranging from th...
Definition: dense_doubly_linked_list.h:21
operations_research::ThreadPool::Schedule
void Schedule(std::function< void()> closure)
Definition: threadpool.cc:77
operations_research::ThreadPool::StartWorkers
void StartWorkers()
Definition: threadpool.cc:49
operations_research::ThreadPool::GetNextTask
std::function< void()> GetNextTask()
Definition: threadpool.cc:56
operations_research::ThreadPool::ThreadPool
ThreadPool(const std::string &prefix, int num_threads)
Definition: threadpool.cc:28
operations_research::ThreadPool::~ThreadPool
~ThreadPool()
Definition: threadpool.cc:31
operations_research::RunWorker
void RunWorker(void *data)
Definition: threadpool.cc:19
capacity
int64 capacity
Definition: routing_flow.cc:129
operations_research::ThreadPool
Definition: threadpool.h:26
CHECK
#define CHECK(condition)
Definition: base/logging.h:495
operations_research::ThreadPool::SetQueueCapacity
void SetQueueCapacity(int capacity)
Definition: threadpool.cc:43