SAMZA-1329: Switch SamzaTaskProxy to use LocalityManager.
[samza.git] / samza-rest / src / main / java / org / apache / samza / rest / proxy / task / SamzaTaskProxy.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.rest.proxy.task;
20
21 import com.google.common.base.Preconditions;
22 import com.google.common.collect.ImmutableList;
23 import com.google.common.collect.ImmutableMap;
24 import java.io.IOException;
25 import java.net.URI;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.stream.Collectors;
30 import org.apache.samza.SamzaException;
31 import org.apache.samza.config.Config;
32 import org.apache.samza.config.ConfigFactory;
33 import org.apache.samza.config.JobConfig;
34 import org.apache.samza.config.MapConfig;
35 import org.apache.samza.config.StorageConfig;
36 import org.apache.samza.container.LocalityManager;
37 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
38 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
39 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
40 import org.apache.samza.metrics.MetricsRegistryMap;
41 import org.apache.samza.rest.model.Task;
42 import org.apache.samza.rest.proxy.installation.InstallationFinder;
43 import org.apache.samza.rest.proxy.installation.InstallationRecord;
44 import org.apache.samza.rest.proxy.job.JobInstance;
45 import org.apache.samza.util.ClassLoaderHelper;
46 import org.apache.samza.util.Util;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.collection.JavaConverters;
50
51 /**
52 * {@link TaskProxy} interface implementation for samza jobs running in yarn execution environment.
53 * getTasks implementation reads the jobModel of the job specified by {@link JobInstance} from coordinator stream.
54 */
55 public class SamzaTaskProxy implements TaskProxy {
56
57 private static final Logger LOG = LoggerFactory.getLogger(SamzaTaskProxy.class);
58
59 private static final MetricsRegistryMap METRICS_REGISTRY = new MetricsRegistryMap();
60
61 private final TaskResourceConfig taskResourceConfig;
62
63 private final InstallationFinder installFinder;
64
65 public SamzaTaskProxy(TaskResourceConfig taskResourceConfig, InstallationFinder installFinder) {
66 this.taskResourceConfig = taskResourceConfig;
67 this.installFinder = installFinder;
68 }
69
70 /**
71 * Fetches the complete job model from the coordinator stream based upon the provided {@link JobInstance}
72 * param, transforms it to a list of {@link Task} and returns it.
73 * {@inheritDoc}
74 */
75 @Override
76 public List<Task> getTasks(JobInstance jobInstance) throws IOException, InterruptedException {
77 Preconditions.checkArgument(installFinder.isInstalled(jobInstance),
78 String.format("Invalid job instance : %s", jobInstance));
79 CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer = null;
80 try {
81 coordinatorStreamSystemConsumer = initializeCoordinatorStreamConsumer(jobInstance);
82 return readTasksFromCoordinatorStream(coordinatorStreamSystemConsumer);
83 } finally {
84 if (coordinatorStreamSystemConsumer != null) {
85 coordinatorStreamSystemConsumer.stop();
86 }
87 }
88 }
89
90 /**
91 * Initialize {@link CoordinatorStreamSystemConsumer} based upon {@link JobInstance} parameter.
92 * @param jobInstance the job instance to get CoordinatorStreamSystemConsumer for.
93 * @return built and initialized CoordinatorStreamSystemConsumer.
94 */
95 protected CoordinatorStreamSystemConsumer initializeCoordinatorStreamConsumer(JobInstance jobInstance) {
96 CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
97 Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance);
98 LOG.debug("Using config: {} to create coordinatorStream consumer.", coordinatorSystemConfig);
99 CoordinatorStreamSystemConsumer consumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
100 LOG.debug("Registering coordinator system stream consumer.");
101 consumer.register();
102 LOG.debug("Starting coordinator system stream consumer.");
103 consumer.start();
104 LOG.debug("Bootstrapping coordinator system stream consumer.");
105 consumer.bootstrap();
106 return consumer;
107 }
108
109 /**
110 * Builds coordinator system config for the {@param jobInstance}.
111 * @param jobInstance the job instance to get the jobModel for.
112 * @return the constructed coordinator system config.
113 */
114 private Config getCoordinatorSystemConfig(JobInstance jobInstance) {
115 try {
116 InstallationRecord record = installFinder.getAllInstalledJobs().get(jobInstance);
117 ConfigFactory configFactory = ClassLoaderHelper.fromClassName(taskResourceConfig.getJobConfigFactory());
118 Config config = configFactory.getConfig(new URI(String.format("file://%s", record.getConfigFilePath())));
119 Map<String, String> configMap = ImmutableMap.of(JobConfig.JOB_ID(), jobInstance.getJobId(),
120 JobConfig.JOB_NAME(), jobInstance.getJobName());
121 return Util.buildCoordinatorStreamConfig(new MapConfig(ImmutableList.of(config, configMap)));
122 } catch (Exception e) {
123 LOG.error(String.format("Failed to get coordinator stream config for job : %s", jobInstance), e);
124 throw new SamzaException(e);
125 }
126 }
127
128 /**
129 * Builds list of {@link Task} from job model in coordinator stream.
130 * @param consumer system consumer associated with a job's coordinator stream.
131 * @return list of {@link Task} constructed from job model in coordinator stream.
132 */
133 protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
134 LocalityManager localityManager = new LocalityManager(null, consumer);
135 Map<String, Map<String, String>> containerIdToHostMapping = localityManager.readContainerLocality();
136 Map<String, String> taskNameToContainerIdMapping = localityManager.getTaskAssignmentManager().readTaskAssignment();
137 StorageConfig storageConfig = new StorageConfig(consumer.getConfig());
138 List<String> storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
139 return taskNameToContainerIdMapping.entrySet()
140 .stream()
141 .map(entry -> {
142 String hostName = containerIdToHostMapping.get(entry.getValue()).get(SetContainerHostMapping.HOST_KEY);
143 return new Task(hostName, entry.getKey(), entry.getValue(), new ArrayList<>(), storeNames);
144 }).collect(Collectors.toList());
145 }
146 }