Update to ARC 4.1.0
[gp-arc-client-c.git] / ext / arc_submit.cpp
1 /*
2  *  submit.cpp
3  *  arcclientc
4  *
5  *  Created by Tamas Jung on 4/7/10.
6  *
7  */
8
9 #include "arc_submit.h"
10 #include "logger.h"
11 // -*- indent-tabs-mode: nil -*-
12
13
14 #include "logger.h"
15 #include <fstream>
16 #include <iostream>
17 #include <list>
18 #include <string>
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <unistd.h>
22
23 #include "arc_libs.h"
24
25 #include "utils.h"
26
27 static char* submit(const Arc::UserConfig& usercfg, const std::list<Arc::JobDescription>& jobdescriptionlist, std::list<Arc::Endpoint>& services, const std::string& requestedSubmissionInterface, const std::string& jobidfile, bool direct_submission);
28 static int dumpjobdescription(const Arc::UserConfig& usercfg, const std::list<Arc::JobDescription>& jobdescriptionlist, const std::list<Arc::Endpoint>& services, const std::string& requestedSubmissionInterface);
29
30 char* arc_submit(int argc, char **argv) {
31   Arc::ArcLocation::Init(argv[0]);
32
33   ClientOptions opt(ClientOptions::CO_SUB,
34                     istring("[filename ...]"),
35                     istring("The arcsub command is used for "
36                             "submitting jobs to Grid enabled "
37                             "computing\nresources."));
38
39   std::list<std::string> params = opt.Parse(argc, argv);
40
41   if (opt.showversion) {
42     std::cout << Arc::IString("%s version %s", "arcsub", "4.1.0")
43               << std::endl;
44     return 0;
45   }
46
47   // If debug is specified as argument, it should be set before loading the configuration.
48   if (!opt.debug.empty())
49     Arc::Logger::getRootLogger().setThreshold(Arc::istring_to_level(opt.debug));
50
51   logger.msg(Arc::VERBOSE, "Running command: %s", opt.GetCommandWithArguments());
52
53   Arc::UserConfig usercfg(opt.conffile, opt.joblist);
54   if (!usercfg) {
55     logger.msg(Arc::ERROR, "Failed configuration initialization");
56     return NULL;
57   }
58
59   if (opt.show_plugins) {
60     std::list<std::string> types;
61     types.push_back("HED:SubmitterPlugin");
62     types.push_back("HED:ServiceEndpointRetrieverPlugin");
63     types.push_back("HED:TargetInformationRetrieverPlugin");
64     types.push_back("HED:JobDescriptionParserPlugin");
65     types.push_back("HED:BrokerPlugin");
66     showplugins("arcsub", types, logger, usercfg.Broker().first);
67     return 0;
68   }
69
70   if (!checkproxy(usercfg)) {
71     return NULL;
72   }
73
74   if (opt.debug.empty() && !usercfg.Verbosity().empty())
75     Arc::Logger::getRootLogger().setThreshold(Arc::istring_to_level(usercfg.Verbosity()));
76
77   if (opt.timeout > 0)
78     usercfg.Timeout(opt.timeout);
79
80   if (!opt.broker.empty())
81     usercfg.Broker(opt.broker);
82
83   opt.jobdescriptionfiles.insert(opt.jobdescriptionfiles.end(),
84                                  params.begin(), params.end());
85
86   if (opt.jobdescriptionfiles.empty() && opt.jobdescriptionstrings.empty()) {
87     logger.msg(Arc::ERROR, "No job description input specified");
88     return NULL;
89   }
90
91   std::list<Arc::JobDescription> jobdescriptionlist;
92
93   // Loop over input job description files
94   for (std::list<std::string>::iterator it = opt.jobdescriptionfiles.begin();
95        it != opt.jobdescriptionfiles.end(); it++) {
96
97     std::ifstream descriptionfile(it->c_str());
98
99     if (!descriptionfile) {
100       logger.msg(Arc::ERROR, "Can not open job description file: %s", *it);
101       return NULL;
102     }
103
104     descriptionfile.seekg(0, std::ios::end);
105     std::streamsize length = descriptionfile.tellg();
106     descriptionfile.seekg(0, std::ios::beg);
107
108     char *buffer = new char[length + 1];
109     descriptionfile.read(buffer, length);
110     descriptionfile.close();
111
112     buffer[length] = '\0';
113     std::list<Arc::JobDescription> jobdescs;
114     Arc::JobDescriptionResult parseres = Arc::JobDescription::Parse((std::string)buffer, jobdescs);
115     if (parseres) {
116       for (std::list<Arc::JobDescription>::iterator itJ = jobdescs.begin();
117            itJ != jobdescs.end(); itJ++) {
118         itJ->Application.DryRun = opt.dryrun;
119         for (std::list<Arc::JobDescription>::iterator itJAlt = itJ->GetAlternatives().begin();
120              itJAlt != itJ->GetAlternatives().end(); itJAlt++) {
121           itJAlt->Application.DryRun = opt.dryrun;
122         }
123       }
124
125       jobdescriptionlist.insert(jobdescriptionlist.end(), jobdescs.begin(), jobdescs.end());
126     }
127     else {
128       logger.msg(Arc::ERROR, "Invalid JobDescription:");
129       std::cout << buffer << std::endl;
130       delete[] buffer;
131       logger.msg(Arc::ERROR, "Parsing error:\n%s", parseres.str());
132       return NULL;
133     }
134     delete[] buffer;
135   }
136
137   //Loop over job description input strings
138   for (std::list<std::string>::iterator it = opt.jobdescriptionstrings.begin();
139        it != opt.jobdescriptionstrings.end(); it++) {
140
141     std::list<Arc::JobDescription> jobdescs;
142     if (Arc::JobDescription::Parse(*it, jobdescs)) {
143       for (std::list<Arc::JobDescription>::iterator itJ = jobdescs.begin();
144            itJ != jobdescs.end(); itJ++) {
145         itJ->Application.DryRun = opt.dryrun;
146         for (std::list<Arc::JobDescription>::iterator itJAlt = itJ->GetAlternatives().begin();
147              itJAlt != itJ->GetAlternatives().end(); itJAlt++) {
148           itJAlt->Application.DryRun = opt.dryrun;
149         }
150       }
151
152       jobdescriptionlist.insert(jobdescriptionlist.end(), jobdescs.begin(), jobdescs.end());
153     }
154     else {
155       logger.msg(Arc::ERROR, "Invalid JobDescription:");
156       std::cout << *it << std::endl;
157       return NULL;
158     }
159   }
160
161   std::list<Arc::Endpoint> services = getServicesFromUserConfigAndCommandLine(usercfg, opt.indexurls, opt.clusters, opt.requestedSubmissionInterfaceName, opt.infointerface);
162
163   if (!opt.direct_submission) {
164     usercfg.AddRejectDiscoveryURLs(opt.rejectdiscovery);
165   }
166
167   if (opt.dumpdescription) {
168     dumpjobdescription(usercfg, jobdescriptionlist, services, opt.requestedSubmissionInterfaceName);
169     return NULL;
170   }
171
172   return submit(usercfg, jobdescriptionlist, services, opt.requestedSubmissionInterfaceName, opt.jobidoutfile, opt.direct_submission);
173 }
174
175 class HandleSubmittedJobs : public Arc::EntityConsumer<Arc::Job> {
176 public:
177   HandleSubmittedJobs(const std::string& jobidfile, const Arc::UserConfig& uc) : jobidfile(jobidfile), uc(uc), submittedJobs() {}
178
179   void addEntity(const Arc::Job& j) {
180     std::cout << Arc::IString("Job submitted with jobid: %s", j.JobID) << std::endl;
181     submittedJobs.push_back(j);
182   }
183   
184   void write() const {
185     if (!jobidfile.empty() && !Arc::Job::WriteJobIDsToFile(submittedJobs, jobidfile)) {
186       logger.msg(Arc::WARNING, "Cannot write job IDs to file (%s)", jobidfile);
187     }
188     Arc::JobInformationStorage* jobStore = createJobInformationStorage(uc);
189     if (jobStore == NULL || !jobStore->Write(submittedJobs)) {
190       if (jobStore == NULL) {
191         std::cerr << Arc::IString("Warning: Unable to open job list file (%s), unknown format", uc.JobListFile()) << std::endl;
192       }
193       else {
194         std::cerr << Arc::IString("Warning: Failed to write job information to file (%s)", uc.JobListFile()) << std::endl;
195       }
196       std::cerr << "         " << Arc::IString("To recover missing jobs, run arcsync") << std::endl;
197     }
198     delete jobStore;
199   }
200
201   void printsummary(const std::list<Arc::JobDescription>& originalDescriptions, const std::list<const Arc::JobDescription*>& notsubmitted) const {
202     if (originalDescriptions.size() > 1) {
203       std::cout << std::endl << Arc::IString("Job submission summary:") << std::endl;
204       std::cout << "-----------------------" << std::endl;
205       std::cout << Arc::IString("%d of %d jobs were submitted", submittedJobs.size(), submittedJobs.size()+notsubmitted.size()) << std::endl;
206       if (!notsubmitted.empty()) {
207         std::cout << Arc::IString("The following %d were not submitted", notsubmitted.size()) << std::endl;
208         for (std::list<const Arc::JobDescription*>::const_iterator it = notsubmitted.begin();
209              it != notsubmitted.end(); ++it) {
210           int jobnr = 1;
211           for (std::list<Arc::JobDescription>::const_iterator itOrig = originalDescriptions.begin();
212                itOrig != originalDescriptions.end(); ++itOrig, ++jobnr) {
213             if (&(*itOrig) == *it) {
214               std::cout << Arc::IString("Job nr.") << " " << jobnr;
215               if (!(*it)->Identification.JobName.empty()) {
216                 std::cout << ": " << (*it)->Identification.JobName;
217               }
218               std::cout << std::endl;
219               break;
220             }
221           }
222         }
223       }
224     }
225   }
226
227   void clearsubmittedjobs() { submittedJobs.clear(); }
228
229   char* getJobID() {
230     return strdup(submittedJobs.back().JobID.c_str());
231   }
232
233 private:
234   const std::string jobidfile;
235   const Arc::UserConfig& uc;
236   std::list<Arc::Job> submittedJobs;
237 };
238
239
240 static char* submit(const Arc::UserConfig& usercfg, const std::list<Arc::JobDescription>& jobdescriptionlist, std::list<Arc::Endpoint>& services, const std::string& requestedSubmissionInterface, const std::string& jobidfile, bool direct_submission) {
241   int retval = 0;
242   
243   HandleSubmittedJobs hsj(jobidfile, usercfg);
244   Arc::Submitter s(usercfg);
245   s.addConsumer(hsj);
246
247   Arc::SubmissionStatus status;
248   if (!direct_submission) {
249     std::list<std::string> rsi;
250     if (!requestedSubmissionInterface.empty()) rsi.push_back(requestedSubmissionInterface);
251     status = s.BrokeredSubmit(services, jobdescriptionlist, rsi);
252   }
253   else {
254     if (!requestedSubmissionInterface.empty()) {
255       for (std::list<Arc::Endpoint>::iterator it = services.begin(); it != services.end();) {
256         // Remove endpoint - it has an unrequested interface name.
257         if (!it->InterfaceName.empty() && it->InterfaceName != requestedSubmissionInterface) {
258           logger.msg(Arc::INFO, "Removing endpoint %s: It has an unrequested interface (%s).", it->URLString, it->InterfaceName);
259           it = services.erase(it);
260           continue;
261         }
262         
263         it->InterfaceName = requestedSubmissionInterface;
264         ++it;
265       }
266     }
267     status = s.Submit(services, jobdescriptionlist);
268   }
269   hsj.write();
270
271   if (status.isSet(Arc::SubmissionStatus::BROKER_PLUGIN_NOT_LOADED)) {
272     std::cerr << Arc::IString("ERROR: Unable to load broker %s", usercfg.Broker().first) << std::endl;
273     return NULL;
274   }
275   if (status.isSet(Arc::SubmissionStatus::NO_SERVICES)) {
276    std::cerr << Arc::IString("ERROR: Job submission aborted because no resource returned any information") << std::endl;
277    return NULL;
278   }
279   if (status.isSet(Arc::SubmissionStatus::DESCRIPTION_NOT_SUBMITTED)) {
280     std::cerr << Arc::IString("ERROR: One or multiple job descriptions was not submitted.") << std::endl;
281     retval = 1;
282   }
283   if (status.isSet(Arc::SubmissionStatus::SUBMITTER_PLUGIN_NOT_LOADED)) {
284     bool gridFTPJobPluginFailed = false;
285     for (std::map<Arc::Endpoint, Arc::EndpointSubmissionStatus>::const_iterator it = s.GetEndpointSubmissionStatuses().begin();
286          it != s.GetEndpointSubmissionStatuses().end(); ++it) {
287       if (it->first.InterfaceName == "org.nordugrid.gridftpjob" && it->second == Arc::EndpointSubmissionStatus::NOPLUGIN) {
288         gridFTPJobPluginFailed = true;
289       }
290     }
291     if (gridFTPJobPluginFailed) {
292       Arc::LogLevel level  = (retval == 1 ? Arc::ERROR : Arc::INFO);
293       std::string indent   = (retval == 1 ? "       " : "      ");
294       logger.msg(level, "A computing resource using the GridFTP interface was requested, but\n"
295                         "%sthe corresponding plugin could not be loaded. Is the plugin installed?\n"
296                         "%sIf not, please install the package 'nordugrid-arc-plugins-globus'.\n"
297                         "%sDepending on your type of installation the package name might differ.", indent, indent, indent);
298     }
299     // TODO: What to do when failing to load other plugins.
300   }
301     
302   hsj.printsummary(jobdescriptionlist, s.GetDescriptionsNotSubmitted());
303
304   if (retval == 0) {
305     return hsj.getJobID();
306   }
307   return NULL;
308 }
309
310 static int dumpjobdescription(const Arc::UserConfig& usercfg, const std::list<Arc::JobDescription>& jobdescriptionlist, const std::list<Arc::Endpoint>& services, const std::string& requestedSubmissionInterface) {
311   int retval = 0;
312
313   std::set<std::string> preferredInterfaceNames;
314   if (usercfg.InfoInterface().empty()) {
315     preferredInterfaceNames.insert("org.nordugrid.ldapglue2");
316   } else {
317     preferredInterfaceNames.insert(usercfg.InfoInterface());
318   }
319
320   Arc::ComputingServiceUniq csu;
321   Arc::ComputingServiceRetriever csr(usercfg, std::list<Arc::Endpoint>(), usercfg.RejectDiscoveryURLs(), preferredInterfaceNames);
322   csr.addConsumer(csu);
323   for (std::list<Arc::Endpoint>::const_iterator it = services.begin(); it != services.end(); it++) {
324     csr.addEndpoint(*it);
325   }
326   csr.wait();
327   std::list<Arc::ComputingServiceType> CEs = csu.getServices();
328
329
330   if (CEs.empty()) {
331     std::cout << Arc::IString("Unable to adapt job description to any resource, no resource information could be obtained.") << std::endl;
332     std::cout << Arc::IString("Original job description is listed below:") << std::endl;
333     for (std::list<Arc::JobDescription>::const_iterator it = jobdescriptionlist.begin();
334          it != jobdescriptionlist.end(); ++it) {
335       std::string descOutput;
336       it->UnParse(descOutput, it->GetSourceLanguage());
337       std::cout << descOutput << std::endl;
338     }
339     return 1;
340   }
341
342   Arc::Broker broker(usercfg, usercfg.Broker().first);
343   if (!broker.isValid(false)) {
344     logger.msg(Arc::ERROR, "Dumping job description aborted: Unable to load broker %s", usercfg.Broker().first);
345     return 1;
346   }
347
348   Arc::ExecutionTargetSorter ets(broker, CEs);
349   std::list<Arc::JobDescription>::const_iterator itJAlt; // Iterator to use for alternative job descriptions.
350   for (std::list<Arc::JobDescription>::const_iterator itJ = jobdescriptionlist.begin();
351        itJ != jobdescriptionlist.end(); ++itJ) {
352     const Arc::JobDescription* currentJobDesc = &*itJ;
353     bool descriptionDumped = false;
354     do {
355       Arc::JobDescription jobdescdump(*currentJobDesc);
356       ets.set(jobdescdump);
357
358       for (ets.reset(); !ets.endOfList(); ets.next()) {
359         if(!requestedSubmissionInterface.empty() && ets->ComputingEndpoint->InterfaceName != requestedSubmissionInterface) continue;
360         if (!jobdescdump.Prepare(*ets)) {
361           logger.msg(Arc::INFO, "Unable to prepare job description according to needs of the target resource (%s).", ets->ComputingEndpoint->URLString); 
362           continue;
363         }
364   
365         std::string jobdesclang = "nordugrid:jsdl";
366         if (ets->ComputingEndpoint->InterfaceName == "org.nordugrid.gridftpjob") {
367           jobdesclang = "nordugrid:xrsl";
368         }
369         else if (ets->ComputingEndpoint->InterfaceName == "org.glite.ce.cream") {
370           jobdesclang = "egee:jdl";
371         }
372         else if (ets->ComputingEndpoint->InterfaceName == "org.ogf.glue.emies.activitycreation") {
373           jobdesclang = "emies:adl";
374         }
375         std::string jobdesc;
376         if (!jobdescdump.UnParse(jobdesc, jobdesclang)) {
377           logger.msg(Arc::INFO, "An error occurred during the generation of job description to be sent to %s", ets->ComputingEndpoint->URLString); 
378           continue;
379         }
380   
381         std::cout << Arc::IString("Job description to be sent to %s:", ets->AdminDomain->Name) << std::endl;
382         std::cout << jobdesc << std::endl;
383         descriptionDumped = true;
384         break;
385       }
386
387       if (!descriptionDumped && itJ->HasAlternatives()) { // Alternative job descriptions.
388         if (currentJobDesc == &*itJ) {
389           itJAlt = itJ->GetAlternatives().begin();
390         }
391         else {
392           ++itJAlt;
393         }
394         currentJobDesc = &*itJAlt;
395       }
396     } while (!descriptionDumped && itJ->HasAlternatives() && itJAlt != itJ->GetAlternatives().end());
397
398     if (ets.endOfList()) {
399       std::cout << Arc::IString("Unable to prepare job description according to needs of the target resource.") << std::endl;
400       retval = 1;
401     }
402   } //end loop over all job descriptions
403
404   return retval;
405 }
406