11
11
12
12
import java .io .*;
13
13
14
+ import java .lang .reflect .Field ;
15
+ import java .lang .ReflectiveOperationException ;
16
+
17
+ import java .util .concurrent .BlockingQueue ;
18
+ import java .util .concurrent .Executors ;
19
+ import java .util .concurrent .ScheduledExecutorService ;
20
+ import java .util .concurrent .TimeUnit ;
21
+
14
22
public class MesosExecutor implements Executor {
15
23
public static final Log LOG = LogFactory .getLog (MesosExecutor .class );
16
24
private SlaveInfo slaveInfo ;
17
25
private TaskTracker taskTracker ;
18
26
27
+ protected final ScheduledExecutorService timerScheduler =
28
+ Executors .newScheduledThreadPool (1 );
29
+
19
30
public static void main (String [] args ) {
20
31
MesosExecutorDriver driver = new MesosExecutorDriver (new MesosExecutor ());
21
32
System .exit (driver .run () == Status .DRIVER_STOPPED ? 0 : 1 );
@@ -37,10 +48,8 @@ private JobConf configure(final TaskInfo task) {
37
48
conf .writeXml (writer );
38
49
writer .flush ();
39
50
String xml = writer .getBuffer ().toString ();
40
- String xmlFormatted =
41
- org .apache .mesos .hadoop .Utils .formatXml (xml );
42
51
LOG .info ("XML Configuration received:\n " +
43
- xmlFormatted );
52
+ org . apache . mesos . hadoop . Utils . formatXml ( xml ) );
44
53
} catch (Exception e ) {
45
54
LOG .warn ("Failed to output configuration as XML." , e );
46
55
}
@@ -123,14 +132,22 @@ public void run() {
123
132
}
124
133
125
134
@ Override
126
- public void killTask (ExecutorDriver driver , TaskID taskId ) {
135
+ public void killTask (final ExecutorDriver driver , final TaskID taskId ) {
127
136
LOG .info ("Killing task : " + taskId .getValue ());
128
- try {
129
- taskTracker .shutdown ();
130
- } catch (IOException e ) {
131
- LOG .error ("Failed to shutdown TaskTracker" , e );
132
- } catch (InterruptedException e ) {
133
- LOG .error ("Failed to shutdown TaskTracker" , e );
137
+ if (taskTracker != null ) {
138
+ LOG .info ("Revoking task tracker map/reduce slots" );
139
+ revokeSlots ();
140
+
141
+ // Send the TASK_FINISHED status
142
+ new Thread ("TaskFinishedUpdate" ) {
143
+ @ Override
144
+ public void run () {
145
+ driver .sendStatusUpdate (TaskStatus .newBuilder ()
146
+ .setTaskId (taskId )
147
+ .setState (TaskState .TASK_FINISHED )
148
+ .build ());
149
+ }
150
+ }.start ();
134
151
}
135
152
}
136
153
@@ -159,4 +176,96 @@ public void error(ExecutorDriver d, String message) {
159
176
public void shutdown (ExecutorDriver d ) {
160
177
LOG .info ("Executor asked to shutdown" );
161
178
}
179
+
180
+ public void revokeSlots () {
181
+ if (taskTracker == null ) {
182
+ LOG .error ("Task tracker is not initialized" );
183
+ return ;
184
+ }
185
+
186
+ int maxMapSlots = 0 ;
187
+ int maxReduceSlots = 0 ;
188
+
189
+ // TODO(tarnfeld): Sanity check that it's safe for us to change the slots.
190
+ // Be sure there's nothing running and nothing in the launcher queue.
191
+
192
+ // If we expect to have no slots, let's go ahead and terminate the task launchers
193
+ if (maxMapSlots == 0 ) {
194
+ try {
195
+ Field launcherField = taskTracker .getClass ().getDeclaredField ("mapLauncher" );
196
+ launcherField .setAccessible (true );
197
+
198
+ // Kill the current map task launcher
199
+ TaskTracker .TaskLauncher launcher = ((TaskTracker .TaskLauncher ) launcherField .get (taskTracker ));
200
+ launcher .notifySlots ();
201
+ launcher .interrupt ();
202
+ } catch (ReflectiveOperationException e ) {
203
+ LOG .fatal ("Failed updating map slots due to error with reflection" , e );
204
+ }
205
+ }
206
+
207
+ if (maxReduceSlots == 0 ) {
208
+ try {
209
+ Field launcherField = taskTracker .getClass ().getDeclaredField ("reduceLauncher" );
210
+ launcherField .setAccessible (true );
211
+
212
+ // Kill the current reduce task launcher
213
+ TaskTracker .TaskLauncher launcher = ((TaskTracker .TaskLauncher ) launcherField .get (taskTracker ));
214
+ launcher .notifySlots ();
215
+ launcher .interrupt ();
216
+ } catch (ReflectiveOperationException e ) {
217
+ LOG .fatal ("Failed updating reduce slots due to error with reflection" , e );
218
+ }
219
+ }
220
+
221
+ // Configure the new slot counts on the task tracker
222
+ taskTracker .setMaxMapSlots (maxMapSlots );
223
+ taskTracker .setMaxReduceSlots (maxReduceSlots );
224
+
225
+ // If we have zero slots left, commit suicide when no jobs are running
226
+ if ((maxMapSlots + maxReduceSlots ) == 0 ) {
227
+ scheduleSuicideTimer ();
228
+ }
229
+ }
230
+
231
+ protected void scheduleSuicideTimer () {
232
+ timerScheduler .schedule (new Runnable () {
233
+ @ Override
234
+ public void run () {
235
+ if (taskTracker == null ) {
236
+ return ;
237
+ }
238
+
239
+ LOG .info ("Checking to see if TaskTracker is idle" );
240
+
241
+ // If the task tracker is idle, all tasks have finished and task output
242
+ // has been cleaned up.
243
+ if (taskTracker .isIdle ()) {
244
+ LOG .warn ("TaskTracker is idle, terminating" );
245
+
246
+ try {
247
+ taskTracker .shutdown ();
248
+ } catch (IOException e ) {
249
+ LOG .error ("Failed to shutdown TaskTracker" , e );
250
+ } catch (InterruptedException e ) {
251
+ LOG .error ("Failed to shutdown TaskTracker" , e );
252
+ }
253
+ }
254
+ else {
255
+ try {
256
+ Field field = taskTracker .getClass ().getDeclaredField ("tasksToCleanup" );
257
+ field .setAccessible (true );
258
+ BlockingQueue <TaskTrackerAction > tasksToCleanup = ((BlockingQueue <TaskTrackerAction >) field .get (taskTracker ));
259
+ LOG .info ("TaskTracker has " + taskTracker .tasks .size () +
260
+ " running tasks and " + tasksToCleanup +
261
+ " tasks to clean up." );
262
+ } catch (ReflectiveOperationException e ) {
263
+ LOG .fatal ("Failed to get task counts from TaskTracker" , e );
264
+ }
265
+
266
+ scheduleSuicideTimer ();
267
+ }
268
+ }
269
+ }, 1000 , TimeUnit .MILLISECONDS );
270
+ }
162
271
}
0 commit comments