-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathQueryManagerSingleNode.hpp
154 lines (126 loc) · 5.16 KB
/
QueryManagerSingleNode.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
#include <cstddef>
#include <memory>
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionState.hpp"
#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
namespace tmb { class MessageBus; }
namespace quickstep {
class CatalogDatabase;
class CatalogDatabaseLite;
class QueryHandle;
class StorageManager;
class WorkerMessage;
/** \addtogroup QueryExecution
* @{
*/
/**
* @brief A class that manages the execution of a query including generation
* of new work orders, keeping track of the query exection state.
**/
class QueryManagerSingleNode final : public QueryManagerBase {
public:
/**
* @brief Constructor.
*
* @param foreman_client_id The TMB client ID of the foreman thread.
* @param num_numa_nodes The number of NUMA nodes used by the system.
* @param query_handle The QueryHandle object for this query.
* @param catalog_database The CatalogDatabse used by the query.
* @param storage_manager The StorageManager used by the query.
* @param bus The TMB used for communication.
**/
QueryManagerSingleNode(const tmb::client_id foreman_client_id,
const std::size_t num_numa_nodes,
QueryHandle *query_handle,
CatalogDatabaseLite *catalog_database,
StorageManager *storage_manager,
tmb::MessageBus *bus);
~QueryManagerSingleNode() override {}
bool fetchNormalWorkOrders(const dag_node_index index) override;
/**
* @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
*
* @param start_operator_index Begin the search for the schedulable WorkOrder
* with the operator at this index.
* @param numa_node The next WorkOrder should preferably have its input(s)
* from this numa_node. This is a hint and not a binding requirement.
*
* @return A pointer to the WorkerMessage. If there's no WorkOrder to be
* executed, return NULL.
**/
WorkerMessage* getNextWorkerMessage(
const dag_node_index start_operator_index,
const numa_node_id node_id = kAnyNUMANodeID);
/**
* @brief Get a pointer to the QueryContext.
**/
inline QueryContext* getQueryContextMutable() {
return query_context_.get();
}
std::size_t getQueryMemoryConsumptionBytes() const override;
private:
bool checkNormalExecutionOver(const dag_node_index index) const override {
return (!workorders_container_->hasNormalWorkOrder(index) &&
query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
query_exec_state_->hasDoneGenerationWorkOrders(index));
}
bool initiateRebuild(const dag_node_index index) override;
bool checkRebuildOver(const dag_node_index index) const override {
DCHECK(query_exec_state_->hasRebuildInitiated(index));
return !workorders_container_->hasRebuildWorkOrder(index) &&
(query_exec_state_->getNumRebuildWorkOrders(index) == 0);
}
/**
* @brief Get the rebuild WorkOrders for an operator.
*
* @note This function should be called only once, when all the normal
* WorkOrders generated by an operator finish their execution.
*
* @param index The index of the operator in the query plan DAG.
* @param container A pointer to a WorkOrdersContainer to be used to store the
* generated WorkOrders.
*
* @return The number of generated rebuild work orders.
**/
std::size_t getRebuildWorkOrders(const dag_node_index index,
WorkOrdersContainer *container);
/**
* @brief Get the total memory (in bytes) occupied by temporary relations
* created during the query execution.
**/
std::size_t getTotalTempRelationMemoryInBytes() const;
const tmb::client_id foreman_client_id_;
StorageManager *storage_manager_;
tmb::MessageBus *bus_;
std::unique_ptr<QueryContext> query_context_;
std::unique_ptr<WorkOrdersContainer> workorders_container_;
const CatalogDatabase &database_;
DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_