-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWorkerDirectory.hpp
243 lines (220 loc) · 8.7 KB
/
WorkerDirectory.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
#define QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
#include <algorithm>
#include <cstddef>
#include <iterator>
#include <utility>
#include <vector>
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
/** \addtogroup QueryExecution
* @{
*/
/**
* @brief A class which keeps the metadata about the workers.
*
* @note The number of workorders per worker used in this class are as viewed by
* Foreman. An alternative implementation can use TMB to query the queue
* lengths of each worker.
*
* @note This class is intended to be used only by Foreman thread. Therefore
* none of the methods in this class are thread-safe.
**/
class WorkerDirectory {
public:
/**
* @brief Constructor.
*
* @param num_workers The number of workers.
* @param client_ids A vector of TMB client IDs for the workers.
* @param numa_node_ids A vector of NUMA node IDs where the workers are
* pinned. If a worker is not pinned to any NUMA node, the NUMA node ID
* for that worker is -1.
**/
WorkerDirectory(const std::size_t num_workers,
const std::vector<client_id> &client_ids,
const std::vector<int> &numa_node_ids)
: num_workers_(num_workers),
num_queued_workorders_(num_workers, 0),
numa_node_ids_(numa_node_ids),
client_ids_(client_ids) {
DEBUG_ASSERT(num_workers > 0);
DEBUG_ASSERT(client_ids.size() == num_workers);
DEBUG_ASSERT(numa_node_ids.size() == num_workers);
}
/**
* @brief Get the number of worker threads.
*
* @return The number of worker threads at the time the function is called.
**/
inline const std::size_t getNumWorkers() const {
return num_workers_;
}
/**
* @brief Get the number of queued workorders for the given worker.
*
* @note The queued number of workorders consist of the workorders waiting for
* execution and the workorder being executed by the specified worker
* at the time this function is called.
*
* @param worker_thread_index The logical ID of the given worker.
*
* @return The number of queued workorders.
**/
inline const std::size_t getNumQueuedWorkOrders(
const std::size_t worker_thread_index) const {
DEBUG_ASSERT(worker_thread_index < num_workers_);
return num_queued_workorders_[worker_thread_index];
}
/**
* @brief Increment the number of queued workorders for the given worker by 1.
*
* @param worker_thread_index The logical ID of the given worker.
**/
inline void incrementNumQueuedWorkOrders(const std::size_t worker_thread_index) {
DEBUG_ASSERT(worker_thread_index < num_workers_);
++num_queued_workorders_[worker_thread_index];
}
/**
* @brief Decrement the number of queued workorders for the given worker by 1.
*
* @param worker_thread_index The logical ID of the given worker.
**/
inline void decrementNumQueuedWorkOrders(const std::size_t worker_thread_index) {
DEBUG_ASSERT(worker_thread_index < num_workers_);
DEBUG_ASSERT(num_queued_workorders_[worker_thread_index] >= 1);
--num_queued_workorders_[worker_thread_index];
}
/**
* @brief Get the NUMA node where the specified worker is pinned to.
*
* @param worker_thread_index The logical ID of the given worker.
*
* @return The NUMA node ID where the given worker is pinned. If the worker
* hasn't been pinned to any NUMA node, this value is -1.
**/
inline int getNUMANode(const std::size_t worker_thread_index) const {
DEBUG_ASSERT(worker_thread_index < num_workers_);
return numa_node_ids_[worker_thread_index];
}
/**
* @brief Get the TMB client ID of the specified worker.
*
* @param worker_thread_index The logical ID of the given worker.
*
* @return The TMB client ID of the given worker.
**/
inline const client_id getClientID(const std::size_t worker_thread_index) const {
DEBUG_ASSERT(worker_thread_index < num_workers_);
return client_ids_[worker_thread_index];
}
/**
* @brief Generate address of a worker.
*
* @param worker_thread_index The logical ID of the given worker.
*
* @return TMB Address of the given worker.
**/
inline Address getWorkerAddress(std::size_t worker_thread_index) const {
DEBUG_ASSERT(worker_thread_index < num_workers_);
Address worker_address;
worker_address.AddRecipient(client_ids_[worker_thread_index]);
return worker_address;
}
/**
* @brief Add metadata about a new worker.
*
* @param cid The TMB client ID of the new worker.
* @param numa_node_id The NUMA node ID where the new worker is pinned.
**/
inline void addWorker(const client_id cid, const int numa_node_id) {
++num_workers_;
num_queued_workorders_.push_back(0);
numa_node_ids_.push_back(numa_node_id);
client_ids_.push_back(cid);
}
/**
* @brief Get information about the least loaded worker at the time the
* function is called.
*
* TODO(harshad) This method performs a linear search. Using a dynamic
* priority queue to track worker loads can help this method
* execute faster.
* TODO(harshad) Extend this method to find least loaded workers on a given
* NUMA node.
*
* @note If there are multiple workers with the least amount of load, the
* worker with the smaller logical ID is returned.
*
* @return A pair: First element is the logical ID of the least loaded worker
* and second element is the number of queued workorders for this
* worker at the time the function is called.
**/
std::pair<std::size_t, std::size_t> getLeastLoadedWorker() const {
std::vector<std::size_t>::const_iterator min_element_iter =
std::min_element(std::begin(num_queued_workorders_),
std::end(num_queued_workorders_));
DEBUG_ASSERT(min_element_iter != num_queued_workorders_.end());
const std::size_t least_loaded_worker_thread_index =
std::distance(num_queued_workorders_.begin(), min_element_iter);
return std::make_pair(least_loaded_worker_thread_index, *min_element_iter);
}
/**
* @brief Get information about the most loaded worker at the time the
* function is called.
*
* TODO(harshad) This method performs a linear search. Using a dynamic
* priority queue to track worker loads can help this method
* execute faster.
* TODO(harshad) Extend this method to find most loaded workers on a given
* NUMA node.
*
* @note If there are multiple workers with the maximum amount of load, the
* worker with the smaller logical ID is returned.
*
* @return A pair: First element is the logical ID of the most loaded worker
* and second element is the number of queued workorders for this
* worker at the time the function is called.
**/
std::pair<std::size_t, std::size_t> getMostLoadedWorker() const {
std::vector<std::size_t>::const_iterator max_element_iter =
std::max_element(std::begin(num_queued_workorders_),
std::end(num_queued_workorders_));
DEBUG_ASSERT(max_element_iter != num_queued_workorders_.end());
const std::size_t most_loaded_worker_thread_index =
std::distance(num_queued_workorders_.begin(), max_element_iter);
return std::make_pair(most_loaded_worker_thread_index, *max_element_iter);
}
private:
std::size_t num_workers_;
// The number of WorkOrders queued for execution per worker.
std::vector<std::size_t> num_queued_workorders_;
// The NUMA node IDs where the workers are pinned to. If a worker is not
// pinned, the ID is -1.
std::vector<int> numa_node_ids_;
// The vector of client IDs
std::vector<client_id> client_ids_;
DISALLOW_COPY_AND_ASSIGN(WorkerDirectory);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_