5 * Created by Tamas Jung on 4/7/10.
9 #include "arc_submit.h"
11 // -*- indent-tabs-mode: nil -*-
19 #include <sys/types.h>
27 static char* submit(const Arc::UserConfig& usercfg, const std::list<Arc::JobDescription>& jobdescriptionlist, std::list<Arc::Endpoint>& services, const std::string& requestedSubmissionInterface, const std::string& jobidfile, bool direct_submission);
28 static int dumpjobdescription(const Arc::UserConfig& usercfg, const std::list<Arc::JobDescription>& jobdescriptionlist, const std::list<Arc::Endpoint>& services, const std::string& requestedSubmissionInterface);
30 char* arc_submit(int argc, char **argv) {
31 Arc::ArcLocation::Init(argv[0]);
33 ClientOptions opt(ClientOptions::CO_SUB,
34 istring("[filename ...]"),
35 istring("The arcsub command is used for "
36 "submitting jobs to Grid enabled "
37 "computing\nresources."));
39 std::list<std::string> params = opt.Parse(argc, argv);
41 if (opt.showversion) {
42 std::cout << Arc::IString("%s version %s", "arcsub", "4.1.0")
47 // If debug is specified as argument, it should be set before loading the configuration.
48 if (!opt.debug.empty())
49 Arc::Logger::getRootLogger().setThreshold(Arc::istring_to_level(opt.debug));
51 logger.msg(Arc::VERBOSE, "Running command: %s", opt.GetCommandWithArguments());
53 Arc::UserConfig usercfg(opt.conffile, opt.joblist);
55 logger.msg(Arc::ERROR, "Failed configuration initialization");
59 if (opt.show_plugins) {
60 std::list<std::string> types;
61 types.push_back("HED:SubmitterPlugin");
62 types.push_back("HED:ServiceEndpointRetrieverPlugin");
63 types.push_back("HED:TargetInformationRetrieverPlugin");
64 types.push_back("HED:JobDescriptionParserPlugin");
65 types.push_back("HED:BrokerPlugin");
66 showplugins("arcsub", types, logger, usercfg.Broker().first);
70 if (!checkproxy(usercfg)) {
74 if (opt.debug.empty() && !usercfg.Verbosity().empty())
75 Arc::Logger::getRootLogger().setThreshold(Arc::istring_to_level(usercfg.Verbosity()));
78 usercfg.Timeout(opt.timeout);
80 if (!opt.broker.empty())
81 usercfg.Broker(opt.broker);
83 opt.jobdescriptionfiles.insert(opt.jobdescriptionfiles.end(),
84 params.begin(), params.end());
86 if (opt.jobdescriptionfiles.empty() && opt.jobdescriptionstrings.empty()) {
87 logger.msg(Arc::ERROR, "No job description input specified");
91 std::list<Arc::JobDescription> jobdescriptionlist;
93 // Loop over input job description files
94 for (std::list<std::string>::iterator it = opt.jobdescriptionfiles.begin();
95 it != opt.jobdescriptionfiles.end(); it++) {
97 std::ifstream descriptionfile(it->c_str());
99 if (!descriptionfile) {
100 logger.msg(Arc::ERROR, "Can not open job description file: %s", *it);
104 descriptionfile.seekg(0, std::ios::end);
105 std::streamsize length = descriptionfile.tellg();
106 descriptionfile.seekg(0, std::ios::beg);
108 char *buffer = new char[length + 1];
109 descriptionfile.read(buffer, length);
110 descriptionfile.close();
112 buffer[length] = '\0';
113 std::list<Arc::JobDescription> jobdescs;
114 Arc::JobDescriptionResult parseres = Arc::JobDescription::Parse((std::string)buffer, jobdescs);
116 for (std::list<Arc::JobDescription>::iterator itJ = jobdescs.begin();
117 itJ != jobdescs.end(); itJ++) {
118 itJ->Application.DryRun = opt.dryrun;
119 for (std::list<Arc::JobDescription>::iterator itJAlt = itJ->GetAlternatives().begin();
120 itJAlt != itJ->GetAlternatives().end(); itJAlt++) {
121 itJAlt->Application.DryRun = opt.dryrun;
125 jobdescriptionlist.insert(jobdescriptionlist.end(), jobdescs.begin(), jobdescs.end());
128 logger.msg(Arc::ERROR, "Invalid JobDescription:");
129 std::cout << buffer << std::endl;
131 logger.msg(Arc::ERROR, "Parsing error:\n%s", parseres.str());
137 //Loop over job description input strings
138 for (std::list<std::string>::iterator it = opt.jobdescriptionstrings.begin();
139 it != opt.jobdescriptionstrings.end(); it++) {
141 std::list<Arc::JobDescription> jobdescs;
142 if (Arc::JobDescription::Parse(*it, jobdescs)) {
143 for (std::list<Arc::JobDescription>::iterator itJ = jobdescs.begin();
144 itJ != jobdescs.end(); itJ++) {
145 itJ->Application.DryRun = opt.dryrun;
146 for (std::list<Arc::JobDescription>::iterator itJAlt = itJ->GetAlternatives().begin();
147 itJAlt != itJ->GetAlternatives().end(); itJAlt++) {
148 itJAlt->Application.DryRun = opt.dryrun;
152 jobdescriptionlist.insert(jobdescriptionlist.end(), jobdescs.begin(), jobdescs.end());
155 logger.msg(Arc::ERROR, "Invalid JobDescription:");
156 std::cout << *it << std::endl;
161 std::list<Arc::Endpoint> services = getServicesFromUserConfigAndCommandLine(usercfg, opt.indexurls, opt.clusters, opt.requestedSubmissionInterfaceName, opt.infointerface);
163 if (!opt.direct_submission) {
164 usercfg.AddRejectDiscoveryURLs(opt.rejectdiscovery);
167 if (opt.dumpdescription) {
168 dumpjobdescription(usercfg, jobdescriptionlist, services, opt.requestedSubmissionInterfaceName);
172 return submit(usercfg, jobdescriptionlist, services, opt.requestedSubmissionInterfaceName, opt.jobidoutfile, opt.direct_submission);
175 class HandleSubmittedJobs : public Arc::EntityConsumer<Arc::Job> {
177 HandleSubmittedJobs(const std::string& jobidfile, const Arc::UserConfig& uc) : jobidfile(jobidfile), uc(uc), submittedJobs() {}
179 void addEntity(const Arc::Job& j) {
180 std::cout << Arc::IString("Job submitted with jobid: %s", j.JobID) << std::endl;
181 submittedJobs.push_back(j);
185 if (!jobidfile.empty() && !Arc::Job::WriteJobIDsToFile(submittedJobs, jobidfile)) {
186 logger.msg(Arc::WARNING, "Cannot write job IDs to file (%s)", jobidfile);
188 Arc::JobInformationStorage* jobStore = createJobInformationStorage(uc);
189 if (jobStore == NULL || !jobStore->Write(submittedJobs)) {
190 if (jobStore == NULL) {
191 std::cerr << Arc::IString("Warning: Unable to open job list file (%s), unknown format", uc.JobListFile()) << std::endl;
194 std::cerr << Arc::IString("Warning: Failed to write job information to file (%s)", uc.JobListFile()) << std::endl;
196 std::cerr << " " << Arc::IString("To recover missing jobs, run arcsync") << std::endl;
201 void printsummary(const std::list<Arc::JobDescription>& originalDescriptions, const std::list<const Arc::JobDescription*>& notsubmitted) const {
202 if (originalDescriptions.size() > 1) {
203 std::cout << std::endl << Arc::IString("Job submission summary:") << std::endl;
204 std::cout << "-----------------------" << std::endl;
205 std::cout << Arc::IString("%d of %d jobs were submitted", submittedJobs.size(), submittedJobs.size()+notsubmitted.size()) << std::endl;
206 if (!notsubmitted.empty()) {
207 std::cout << Arc::IString("The following %d were not submitted", notsubmitted.size()) << std::endl;
208 for (std::list<const Arc::JobDescription*>::const_iterator it = notsubmitted.begin();
209 it != notsubmitted.end(); ++it) {
211 for (std::list<Arc::JobDescription>::const_iterator itOrig = originalDescriptions.begin();
212 itOrig != originalDescriptions.end(); ++itOrig, ++jobnr) {
213 if (&(*itOrig) == *it) {
214 std::cout << Arc::IString("Job nr.") << " " << jobnr;
215 if (!(*it)->Identification.JobName.empty()) {
216 std::cout << ": " << (*it)->Identification.JobName;
218 std::cout << std::endl;
227 void clearsubmittedjobs() { submittedJobs.clear(); }
230 return strdup(submittedJobs.back().JobID.c_str());
234 const std::string jobidfile;
235 const Arc::UserConfig& uc;
236 std::list<Arc::Job> submittedJobs;
240 static char* submit(const Arc::UserConfig& usercfg, const std::list<Arc::JobDescription>& jobdescriptionlist, std::list<Arc::Endpoint>& services, const std::string& requestedSubmissionInterface, const std::string& jobidfile, bool direct_submission) {
243 HandleSubmittedJobs hsj(jobidfile, usercfg);
244 Arc::Submitter s(usercfg);
247 Arc::SubmissionStatus status;
248 if (!direct_submission) {
249 std::list<std::string> rsi;
250 if (!requestedSubmissionInterface.empty()) rsi.push_back(requestedSubmissionInterface);
251 status = s.BrokeredSubmit(services, jobdescriptionlist, rsi);
254 if (!requestedSubmissionInterface.empty()) {
255 for (std::list<Arc::Endpoint>::iterator it = services.begin(); it != services.end();) {
256 // Remove endpoint - it has an unrequested interface name.
257 if (!it->InterfaceName.empty() && it->InterfaceName != requestedSubmissionInterface) {
258 logger.msg(Arc::INFO, "Removing endpoint %s: It has an unrequested interface (%s).", it->URLString, it->InterfaceName);
259 it = services.erase(it);
263 it->InterfaceName = requestedSubmissionInterface;
267 status = s.Submit(services, jobdescriptionlist);
271 if (status.isSet(Arc::SubmissionStatus::BROKER_PLUGIN_NOT_LOADED)) {
272 std::cerr << Arc::IString("ERROR: Unable to load broker %s", usercfg.Broker().first) << std::endl;
275 if (status.isSet(Arc::SubmissionStatus::NO_SERVICES)) {
276 std::cerr << Arc::IString("ERROR: Job submission aborted because no resource returned any information") << std::endl;
279 if (status.isSet(Arc::SubmissionStatus::DESCRIPTION_NOT_SUBMITTED)) {
280 std::cerr << Arc::IString("ERROR: One or multiple job descriptions was not submitted.") << std::endl;
283 if (status.isSet(Arc::SubmissionStatus::SUBMITTER_PLUGIN_NOT_LOADED)) {
284 bool gridFTPJobPluginFailed = false;
285 for (std::map<Arc::Endpoint, Arc::EndpointSubmissionStatus>::const_iterator it = s.GetEndpointSubmissionStatuses().begin();
286 it != s.GetEndpointSubmissionStatuses().end(); ++it) {
287 if (it->first.InterfaceName == "org.nordugrid.gridftpjob" && it->second == Arc::EndpointSubmissionStatus::NOPLUGIN) {
288 gridFTPJobPluginFailed = true;
291 if (gridFTPJobPluginFailed) {
292 Arc::LogLevel level = (retval == 1 ? Arc::ERROR : Arc::INFO);
293 std::string indent = (retval == 1 ? " " : " ");
294 logger.msg(level, "A computing resource using the GridFTP interface was requested, but\n"
295 "%sthe corresponding plugin could not be loaded. Is the plugin installed?\n"
296 "%sIf not, please install the package 'nordugrid-arc-plugins-globus'.\n"
297 "%sDepending on your type of installation the package name might differ.", indent, indent, indent);
299 // TODO: What to do when failing to load other plugins.
302 hsj.printsummary(jobdescriptionlist, s.GetDescriptionsNotSubmitted());
305 return hsj.getJobID();
310 static int dumpjobdescription(const Arc::UserConfig& usercfg, const std::list<Arc::JobDescription>& jobdescriptionlist, const std::list<Arc::Endpoint>& services, const std::string& requestedSubmissionInterface) {
313 std::set<std::string> preferredInterfaceNames;
314 if (usercfg.InfoInterface().empty()) {
315 preferredInterfaceNames.insert("org.nordugrid.ldapglue2");
317 preferredInterfaceNames.insert(usercfg.InfoInterface());
320 Arc::ComputingServiceUniq csu;
321 Arc::ComputingServiceRetriever csr(usercfg, std::list<Arc::Endpoint>(), usercfg.RejectDiscoveryURLs(), preferredInterfaceNames);
322 csr.addConsumer(csu);
323 for (std::list<Arc::Endpoint>::const_iterator it = services.begin(); it != services.end(); it++) {
324 csr.addEndpoint(*it);
327 std::list<Arc::ComputingServiceType> CEs = csu.getServices();
331 std::cout << Arc::IString("Unable to adapt job description to any resource, no resource information could be obtained.") << std::endl;
332 std::cout << Arc::IString("Original job description is listed below:") << std::endl;
333 for (std::list<Arc::JobDescription>::const_iterator it = jobdescriptionlist.begin();
334 it != jobdescriptionlist.end(); ++it) {
335 std::string descOutput;
336 it->UnParse(descOutput, it->GetSourceLanguage());
337 std::cout << descOutput << std::endl;
342 Arc::Broker broker(usercfg, usercfg.Broker().first);
343 if (!broker.isValid(false)) {
344 logger.msg(Arc::ERROR, "Dumping job description aborted: Unable to load broker %s", usercfg.Broker().first);
348 Arc::ExecutionTargetSorter ets(broker, CEs);
349 std::list<Arc::JobDescription>::const_iterator itJAlt; // Iterator to use for alternative job descriptions.
350 for (std::list<Arc::JobDescription>::const_iterator itJ = jobdescriptionlist.begin();
351 itJ != jobdescriptionlist.end(); ++itJ) {
352 const Arc::JobDescription* currentJobDesc = &*itJ;
353 bool descriptionDumped = false;
355 Arc::JobDescription jobdescdump(*currentJobDesc);
356 ets.set(jobdescdump);
358 for (ets.reset(); !ets.endOfList(); ets.next()) {
359 if(!requestedSubmissionInterface.empty() && ets->ComputingEndpoint->InterfaceName != requestedSubmissionInterface) continue;
360 if (!jobdescdump.Prepare(*ets)) {
361 logger.msg(Arc::INFO, "Unable to prepare job description according to needs of the target resource (%s).", ets->ComputingEndpoint->URLString);
365 std::string jobdesclang = "nordugrid:jsdl";
366 if (ets->ComputingEndpoint->InterfaceName == "org.nordugrid.gridftpjob") {
367 jobdesclang = "nordugrid:xrsl";
369 else if (ets->ComputingEndpoint->InterfaceName == "org.glite.ce.cream") {
370 jobdesclang = "egee:jdl";
372 else if (ets->ComputingEndpoint->InterfaceName == "org.ogf.glue.emies.activitycreation") {
373 jobdesclang = "emies:adl";
376 if (!jobdescdump.UnParse(jobdesc, jobdesclang)) {
377 logger.msg(Arc::INFO, "An error occurred during the generation of job description to be sent to %s", ets->ComputingEndpoint->URLString);
381 std::cout << Arc::IString("Job description to be sent to %s:", ets->AdminDomain->Name) << std::endl;
382 std::cout << jobdesc << std::endl;
383 descriptionDumped = true;
387 if (!descriptionDumped && itJ->HasAlternatives()) { // Alternative job descriptions.
388 if (currentJobDesc == &*itJ) {
389 itJAlt = itJ->GetAlternatives().begin();
394 currentJobDesc = &*itJAlt;
396 } while (!descriptionDumped && itJ->HasAlternatives() && itJAlt != itJ->GetAlternatives().end());
398 if (ets.endOfList()) {
399 std::cout << Arc::IString("Unable to prepare job description according to needs of the target resource.") << std::endl;
402 } //end loop over all job descriptions