Logo coherent WaveBurst  
Library Reference Guide
Logo
cwb_condor_recovery.C
Go to the documentation of this file.
1 /*
2 # Copyright (C) 2019 Gabriele Vedovato
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17 
18 
19 // compare list of jobs included in the dag file and check job finished (from history) produce condor dag file : used by the cwb_condor command
20 
21 {
22  #include <vector>
23 
25 
26  TB.checkFile(gSystem->Getenv("CWB_ROOTLOGON_FILE"));
27  TB.checkFile(gSystem->Getenv("CWB_PARAMETERS_FILE"));
28  TB.checkFile(gSystem->Getenv("CWB_UPARAMETERS_FILE"));
29 
30  if(TString(condor_tag)=="") {
31  cout << endl;
32  cout << "cwb_condor_recovery.C : Error - the accounting_group is not defined !!!" << endl;
33  cout << "The accounting_group must be defined in the user_parameters.C file" << endl;
34  cout << "See the following link:" << endl;
35  cout <<" https://ldas-gridmon.ligo.caltech.edu/accounting/condor_groups/determine_condor_account_group.html" << endl;
36  cout << "Examples : " << endl;
37  cout << "strcpy(condor_tag,\"ligo.dev.o2.burst.allsky.cwboffline\");" << endl;
38  cout << "strcpy(condor_tag,\"ligo.prod.o2.burst.allsky.cwboffline\");" << endl;
39  cout << "If you don't need it set : strcpy(condor_tag,\"disabled\");" << endl << endl;
40  exit(1);
41  }
42  if(TString(condor_tag)=="disabled") strcpy(condor_tag,"");
43 
45  if(gSystem->Getenv("CWB_UPARAMETERS_FILE")==NULL) {
46  cout << "Error : environment CWB_UPARAMETERS_FILE is not defined!!!" << endl;exit(1);
47  } else {
48  cwb_uparameters_file=TString(gSystem->Getenv("CWB_UPARAMETERS_FILE"));
49  }
50 
51  // get cwb stage name
52  TString cwb_stage_name="CWB_STAGE_FULL";
53  if(gSystem->Getenv("CWB_STAGE_NAME")!=NULL) {
54  cwb_stage_name=TString(gSystem->Getenv("CWB_STAGE_NAME"));
55  }
56  if(cwb_stage_name=="CWB_STAGE_FULL") cwb_stage_name="CWB_STAGE_LIKELIHOOD";
57  // convert stage name to value
58  TString cwb_resume_label=output_dir; // used with resume
60  if(cwb_stage_name=="CWB_STAGE_FULL") {cwb_stage=CWB_STAGE_LIKELIHOOD; cwb_resume_label+="/supercluster_";}
61  if(cwb_stage_name=="CWB_STAGE_INIT") {cwb_stage=CWB_STAGE_INIT; cwb_resume_label+="";}
62  if(cwb_stage_name=="CWB_STAGE_STRAIN") {cwb_stage=CWB_STAGE_STRAIN; cwb_resume_label+="/init_";}
63  if(cwb_stage_name=="CWB_STAGE_CSTRAIN") {cwb_stage=CWB_STAGE_CSTRAIN; cwb_resume_label+="/strain_";}
64  if(cwb_stage_name=="CWB_STAGE_COHERENCE") {cwb_stage=CWB_STAGE_COHERENCE; cwb_resume_label+="/cstrain_";}
65  if(cwb_stage_name=="CWB_STAGE_SUPERCLUSTER") {cwb_stage=CWB_STAGE_SUPERCLUSTER;cwb_resume_label+="/coherence_";}
66  if(cwb_stage_name=="CWB_STAGE_LIKELIHOOD") {cwb_stage=CWB_STAGE_LIKELIHOOD; cwb_resume_label+="/supercluster_";}
67  if(gSystem->Getenv("CWB_STAGE_NAME")!=NULL) {
68  cwb_stage_name=TString(gSystem->Getenv("CWB_STAGE_NAME"));
69  }
70 
71  char full_condor_dir[1024];
72  sprintf(full_condor_dir,"%s/%s",work_dir,condor_dir);
73 
74  // check if condor dag file is present, otherwise it is created
75  // the dag file could be not present in the second stage analysis
76  bool exists = TB.isFileExisting(TString::Format("%s/%s.dag",full_condor_dir,data_label));
77  if(!exists) {
78  TString cwb_scripts = TString(gSystem->Getenv("CWB_SCRIPTS"));
79  TString exec_cmd = TString::Format("%s/cwb_condor.csh create",cwb_scripts.Data());
80  int ret=gSystem->Exec(exec_cmd);
81  if(ret) {cout << "Error while executing cwb_condor create !!!" << endl;exit(1);}
82  }
83 
84  // read condor job list
85  vector<int> jobList=TB.getCondorJobList(full_condor_dir, data_label);
86 
87  int max_jobs = 0;
88  for(int i=0;i<jobList.size();i++) if(jobList[i]>max_jobs) max_jobs=jobList[i];
89 
90  int* jobStart = new int[max_jobs];
91  int* jobStop = new int[max_jobs];
92 
94  for (int i=0;i<max_jobs;i++) jobStage[i]=(CWB_STAGE)-1; // excluded jobs
95  for (int i=0;i<jobList.size();i++) jobStage[jobList[i]-1]=(CWB_STAGE)0; // jobs in the dag file
96 
97  char tag[256];sprintf(tag,"%s.dag.recovery.",data_label);
98  vector<TString> fileList = TB.getFileListFromDir(condor_dir, "", tag);
99  int iversion=0;
100  for(int i=0;i<fileList.size();i++) {
101  //cout << i << " " << fileList[i].Data() << endl;
102  TObjArray* token = TString(fileList[i]).Tokenize(TString("."));
103  TObjString* srecoveryID = (TObjString*)token->At(token->GetEntries()-1);
104  if(srecoveryID->GetString().IsDigit()) {
105  cout << i << " " << fileList[i].Data() << endl;
106  int recoveryID = srecoveryID->GetString().Atoi();
107  if(iversion<recoveryID) iversion=recoveryID;
108  }
109  }
110  iversion++;
111 
112  char dagfile[1024];
113  sprintf(dagfile,"%s/%s.dag.recovery.%d",condor_dir,data_label,iversion);
114 
115  // Check if dag file already exist
116  Long_t id,size,flags,mt;
117  int estat = gSystem->GetPathInfo(dagfile,&id,&size,&flags,&mt);
118  if (estat==0) {
119  char answer[256];
120  strcpy(answer,"");
121  do {
122  cout << "File \"" << dagfile << "\" already exist" << endl;
123  cout << "Do you want to overwrite the file ? (y/n) ";
124  cin >> answer;
125  cout << endl << endl;
126  } while ((strcmp(answer,"y")!=0)&&(strcmp(answer,"n")!=0));
127  if (strcmp(answer,"n")==0) {
128  exit(0);
129  }
130  }
131 
132  // get cwb_stage_resume
133  // if true then recovery done only if previous cwb_stage is present in the output dir
135  if(gSystem->Getenv("CWB_STAGE_RESUME")!=NULL) {
136  cwb_stage_resume=TString(gSystem->Getenv("CWB_STAGE_RESUME"));
137  }
138 
139  // get cwb stage input dir (input files produced by the previous stage)
141  if(gSystem->Getenv("CWB_STAGE_INPUT")!=NULL) {
142  cwb_stage_input=TString(gSystem->Getenv("CWB_STAGE_INPUT"));
143  }
144  if(cwb_stage_input=="") cwb_stage_input=output_dir;
145  TB.checkFile(cwb_stage_input);
146 
147  // factor label : extract the last factor value
148  char sfactor[32]="";
149  if(simulation) {
150  if(simulation==3) {
151  if(factor<0) sprintf(sfactor,"_n%g",fabs(factors[nfactor-1]));
152  if(factor==0) sprintf(sfactor,"_z%g",factors[nfactor-1]);
153  if(factor>0) sprintf(sfactor,"_p%g",factors[nfactor-1]);
154  } else if(simulation==4) {
155  int ioffset = int(factors[0])<=0 ? 1 : int(factors[0]);
156  ioffset+=nfactor-1;
157  sprintf(sfactor,"_%i",ioffset);
158  } else sprintf(sfactor,"_%g",factors[nfactor-1]);
159  }
160  char job_label[512];sprintf(job_label,"%s%s",data_label,sfactor);
161 
162  cout << "Starting reading output directory ..." << endl;
163  vector<TString> jobFiles(max_jobs);
164  for(int i=0;i<max_jobs;i++) jobFiles[i]=cwb_uparameters_file;
165  fileList = TB.getFileListFromDir(cwb_stage_input,".root","",data_label,true);
166  for(int n=0;n<fileList.size();n++) {
167 
168  int jobId = TB.getJobId(fileList[n]); // Get JOB ID
169  jobId-=1;
170 
171  if(fileList[n].BeginsWith(cwb_stage_input+"/init_"))
172  if(CWB_STAGE_INIT>jobStage[jobId])
173  {jobStage[jobId]=CWB_STAGE_INIT;jobFiles[jobId]=fileList[n];continue;}
174  if(fileList[n].BeginsWith(cwb_stage_input+"/strain_"))
175  if(CWB_STAGE_STRAIN>jobStage[jobId])
176  {jobStage[jobId]=CWB_STAGE_STRAIN;jobFiles[jobId]=fileList[n];continue;}
177  if(fileList[n].BeginsWith(cwb_stage_input+"/cstrain_"))
178  if(CWB_STAGE_CSTRAIN>jobStage[jobId])
179  {jobStage[jobId]=CWB_STAGE_CSTRAIN;jobFiles[jobId]=fileList[n];continue;}
180  if(fileList[n].BeginsWith(cwb_stage_input+"/coherence_"))
181  if(CWB_STAGE_COHERENCE>jobStage[jobId])
182  {jobStage[jobId]=CWB_STAGE_COHERENCE;jobFiles[jobId]=fileList[n];continue;}
183  if(fileList[n].BeginsWith(cwb_stage_input+"/supercluster_"))
184  if(CWB_STAGE_SUPERCLUSTER>jobStage[jobId])
185  {jobStage[jobId]=CWB_STAGE_SUPERCLUSTER;jobFiles[jobId]=fileList[n];continue;}
186  if(fileList[n].BeginsWith(cwb_stage_input+"/wave_")&&fileList[n].Contains(job_label))
187  if(CWB_STAGE_LIKELIHOOD>jobStage[jobId])
188  {jobStage[jobId]=CWB_STAGE_LIKELIHOOD;jobFiles[jobId]=fileList[n];continue;}
189 
190 /*
191  // Get STOP JOB info from history
192  TFile *ifile = TFile::Open(fileList[n]);
193  if(ifile==NULL) {cout << "Failed to open " << fileList[n].Data() << endl;exit(-1);}
194  CWB::History* ihistory = (CWB::History*)ifile->Get("history");
195  if(ihistory==NULL) { cout << "Error : history is not present!!!" << endl;exit(1); }
196  int log_size = ihistory->GetLogSize("FULL");
197  TString log = ihistory->GetLog("FULL",log_size-1);
198  ifile->Close();
199  if(log!="STOP JOB") nrecovery++;
200 */
201  }
202  //for (int i=0;i<max_jobs;i++) cout << i << " " << jobStage[i] << " " << jobFiles[i].Data() << " " << cwb_stage << endl;
203 
204  int nrecovery=0;
205  for (int i=0;i<max_jobs;i++)
206  if ((jobStage[i]>=cwb_stage)||(jobStage[i]>=CWB_STAGE_LIKELIHOOD)) nrecovery++;
207  nrecovery=jobList.size()-nrecovery;
208  if(nrecovery==0) {
209  cout << "No Jobs to be recovered" << endl;
210  gSystem->Exit(0);
211  }
212  cout << endl;
213  cout << "New Recovey File " << endl;
214  cout << dagfile << endl;
215 
216  // condor log dirs
217  char full_condor_out_dir[1024];
218  char full_condor_err_dir[1024];
219  sprintf(full_condor_out_dir,"%s/%s",work_dir,log_dir);
220  sprintf(full_condor_err_dir,"%s/%s",work_dir,log_dir);
221 
222  // create dag condor file
223  sprintf(full_condor_dir,"%s/%s",work_dir,condor_dir);
224 
225  ofstream out;
226  out.open(dagfile,ios::out);
227  int cnt = 0;
228  for (int i=0;i<max_jobs;i++) {
229  if (i%1000==0) cout << i << "/" << max_jobs << endl;
230  if ((jobStage[i]!=(CWB_STAGE)-1)&&(jobStage[i]<cwb_stage)&&(jobStage[i]<CWB_STAGE_LIKELIHOOD)) {
231  if(cwb_stage_resume=="TRUE") if(!jobFiles[i].BeginsWith(cwb_resume_label)) continue;
232  cnt++;
233  char ostring[256];
234  int jobID=i+1;
235  sprintf(ostring,"JOB A%i %s/%s.sub.recovery.%d",jobID,full_condor_dir,data_label,iversion);
236  out << ostring << endl;
237  sprintf(ostring,"VARS A%i PID=\"%i\" CWB_UFILE=\"%s\" CWB_STAGE=\"%s\"",
238  jobID,jobID,jobFiles[i].Data(),cwb_stage_name.Data());
239  out << ostring << endl;
240  sprintf(ostring,"RETRY A%i 3000",jobID);
241  out << ostring << endl;
242  // remove broken symbolic links of condor log files (avoid init condor failure)
243  TString path;
244  char symlink[1024];
245  Long_t id,size,flags,mt;
246  sprintf(symlink,"%s/%d_%s_%s.out",full_condor_out_dir,jobID,data_label,cwb_stage_name.Data());
247  path = CWB::Toolbox::getFileName(symlink);
248  if(path!="") {
249  int estat = gSystem->GetPathInfo(path.Data(),&id,&size,&flags,&mt);
250  if(estat!=0) { // condor log out symbolic link is broken
251  char cmd[1024]; sprintf(cmd,"rm -f %s",symlink);
252  gSystem->Exec(cmd);
253  }
254  }
255  sprintf(symlink,"%s/%d_%s_%s.err",full_condor_err_dir,jobID,data_label,cwb_stage_name.Data());
256  path = CWB::Toolbox::getFileName(symlink);
257  if(path!="") {
258  int estat = gSystem->GetPathInfo(symlink,&id,&size,&flags,&mt);
259  if(estat!=0) { // condor log err symbolic link is broken
260  char cmd[1024]; sprintf(cmd,"rm -f %s",symlink);
261  gSystem->Exec(cmd);
262  }
263  }
264  }
265  }
266  out.close();
267 
268  if(gSystem->Getenv("_USE_LSF")!=NULL) {
269 
270  // make lsf label
271  char lsf_label[1024];
272  if(cwb_stage_name=="CWB_STAGE_FULL") {
273  sprintf(lsf_label,"%s",data_label);
274  } else {
275  sprintf(lsf_label,"%s_%s",data_label,cwb_stage_name.Data());
276  }
277 
278  // create tgz of the working dir
279  TString exec_cmd = TString::Format("tar -czf %s/%s.tgz %s %s %s %s/*.sh --exclude='*/.svn'",
281  gSystem->Exec(exec_cmd);
282  cout << endl << "Created tgz file : " << condor_dir<<"/"<<lsf_label<<".tgz" << endl;
283 
284  // create LSF file from the DAG file
286  if(lsfFile!="") {
287  cout << endl << "Unfinished Jobs : " << cnt << "/" << jobList.size() << endl;
288  cout << endl << "Created LSF file : " << lsfFile << endl << endl;
289  cout << "To submit LSF recovered jobs, type :" << endl;
290  cout << "cwb_lsf submit " << lsfFile << endl << endl;
291  } else {
292  cout << endl << "No jobs to be submitted !!!" << endl << endl;
293  }
294  gSystem->Exit(0);
295  }
296 
297  if(gSystem->Getenv("_USE_PEGASUS")!=NULL) {
298  // create in tgz file
299  ofstream out;
300  char infile[1024];
301  sprintf(infile,"%s/%s.in.recovery.%d",condor_dir,data_label,iversion);
302  out.open(infile,ios::out);
303  out << "../" << config_dir << "/" << endl;
304  out << "../" << input_dir << "/" << endl;
305  out << "../" << macro_dir << "/" << endl;
306 /*
307  for (int i=0;i<max_jobs;i++) {
308  if ((jobStage[i]!=-1)&&(jobStage[i]<cwb_stage)&&(jobStage[i]<CWB_STAGE_LIKELIHOOD)) {
309  if(jobStage[i]!=CWB_STAGE_FULL) out << "../" << jobFiles[i] << endl;
310  }
311  }
312 */
313  out.close();
314  // execute cwb_pegasus_create.sh
315  sprintf(dagfile,"%s.dag.recovery.%d",data_label,iversion);
316  TString cwb_scripts = TString(gSystem->Getenv("CWB_SCRIPTS"));
317  TString exec_cmd = TString::Format("cd %s;%s/cwb_pegasus_create.sh %s",
318  condor_dir,cwb_scripts.Data(),dagfile);
319  int ret=gSystem->Exec(exec_cmd);
320  if(ret) {cout << "Error while executing cwb_pegasus_create !!!" << endl;exit(1);}
321 
322  cout << endl;
323  cout << "Unfinished Jobs : " << cnt << "/" << jobList.size() << endl;
324  cout << endl;
325  sprintf(dagfile,"%s/%s.dag.recovery.%d",condor_dir,data_label,iversion);
326  cout << "To submit pegasus recovered jobs, type :" << endl;
327  cout << "cwb_pegasus submit " << dagfile << endl;
328  } else {
329  // create sub condor file
330  char extention[1024];
331  sprintf(extention,"recovery.%d",iversion);
332  TB.createSubFile(data_label, full_condor_dir, full_condor_out_dir,
333  full_condor_err_dir, condor_log, extention, condor_tag);
334  cout << endl;
335  cout << "Unfinished Jobs : " << cnt << "/" << jobList.size() << endl;
336  cout << endl;
337  sprintf(dagfile,"%s/%s.dag.recovery.%d",condor_dir,data_label,iversion);
338  cout << "To submit condor recovered jobs, type :" << endl;
339  cout << "cwb_condor submit " << dagfile << endl;
340  }
341  cout << endl;
342 
343  if(gSystem->Getenv("_USE_LSF")!=NULL) {
344 
345  TString cwb_stage_label="supercluster_";
346  if( cwb_stage_input=="FULL") cwb_stage_label="wave_";
347  if( cwb_stage_input=="INIT") cwb_stage_label="init_";
348  if( cwb_stage_input=="STRAIN") cwb_stage_label="strain_";
349  if( cwb_stage_input=="CSTRAIN") cwb_stage_label="cstrain_";
350  if( cwb_stage_input=="COHERENCE") cwb_stage_label="coherence_";
351  if( cwb_stage_input=="SUPERCLUSTER") cwb_stage_label="supercluster_";
352  if( cwb_stage_input=="LIKELIHOOD") cwb_stage_label="wave_";
353 
354  int jobID=1; // jobID must be defined -> TO BE FIXED !!!!
355  TString exec_cmd = TString::Format("export file_n_st=""$(ls %s*_job%i.root)""",cwb_stage_label.Data(),jobID);
356  gSystem->Exec(exec_cmd);
357  gSystem->Exec("echo $file_n_st");
358  if(gSystem->Getenv("file_n_st")!=NULL) {
359  char *file_tmp = gSystem->ExpandPathName("output/$file_n_st");
360  //cout<<file_tmp<<endl;
361  exec_cmd = TString::Format("tar -czf %s/%s.tgz %s %s %s %s %s %s --exclude='*/.svn' --exclude='%s/*' --exclude='%s/*'",
363  log_dir, macro_dir, output_dir, log_dir, file_tmp);
364  } else {
365  // create tgz of the working dir
366  exec_cmd = TString::Format("tar -czf %s/%s.tgz %s %s %s %s %s --exclude='*/.svn' --exclude='%s/*' --exclude='%s/*'",
369  gSystem->Exec(exec_cmd);
370  cout << endl << "Created tgz file : " << condor_dir<<"/"<<data_label<<".tgz" << endl;
371  }
372  }
373 
374  delete [] jobStage;
375  delete [] jobStart;
376  delete [] jobStop;
377 
378  gSystem->Exit(0);
379 }
TString cwb_scripts
Long_t size
TString cwb_stage_name
static vector< TString > getFileListFromDir(TString dir_name, TString endString="", TString beginString="", TString containString="", bool fast=false)
Definition: Toolbox.cc:5108
sprintf(full_condor_dir,"%s/%s", work_dir, condor_dir)
float factor
TString cwb_stage_input
TString cwb_stage_label
Definition: cwb_clonedir.C:164
vector< int > jobList
int n
Definition: cwb_net.C:28
Long_t flags
char dagfile[1024]
TString("c")
Long_t mt
ofstream out
Definition: cwb_merge.C:214
char full_condor_dir[1024]
CWB::Toolbox TB
int max_jobs
char macro_dir[512]
Definition: test_config1.C:150
i drho i
static bool checkFile(TString fName, bool question=false, TString message="")
Definition: Toolbox.cc:4670
int jobID
Definition: cwb_net.C:195
char full_condor_out_dir[1024]
char data_label[512]
Definition: test_config1.C:160
char input_dir[512]
Definition: test_config1.C:145
int nrecovery
static int createSubFile(TString label, TString condor_dir, TString out_dir, TString err_dir, TString log_dir, TString ext="", TString condor_tag="")
Definition: Toolbox.cc:1337
int estat
Long_t id
i() int(T_cor *100))
char job_label[512]
CWB_STAGE * jobStage
int jobStop[max_jobs+1]
char data_dir[512]
Definition: test_config1.C:152
TObjArray * token
char log_dir[512]
Definition: test_config1.C:151
char config_dir[512]
Definition: test_config1.C:144
static int getJobId(TString file, TString fext="root")
Definition: Toolbox.cc:6697
char tag[256]
Definition: cwb_merge.C:92
char full_condor_err_dir[1024]
int jobStart[max_jobs+1]
char answer[256]
static vector< int > getCondorJobList(TString condor_dir, TString label)
Definition: Toolbox.cc:1398
cout<< "Starting reading output directory ..."<< endl;vector< TString > fileList
double fabs(const Complex &x)
Definition: numpy.cc:55
static TString DAG2LSF(char *dagFile, char *data_label, char *nodedir, char *data_dir, char *condor_dir, char *log_dir, char *output_dir, char *work_dir)
Definition: Toolbox.cc:1561
char cmd[1024]
strcpy(RunLabel, RUN_LABEL)
int cnt
int nfactor
Definition: test_config1.C:83
TString cwb_stage_resume
cout<< "Starting reading output directory ..."<< endl;vector< TString > jobFiles(max_jobs)
bool exists
char condor_log[512]
Definition: test_config1.C:163
static TString getFileName(FILE *fp)
Definition: Toolbox.cc:6780
TString cwb_uparameters_file
int jobId
char nodedir[1024]
Definition: test_config1.C:187
char condor_dir[512]
Definition: test_config1.C:148
char work_dir[512]
Definition: test_config1.C:143
static bool isFileExisting(TString fName)
Definition: Toolbox.cc:4651
CWB_STAGE cwb_stage
simulation
Definition: cwb_eced.C:26
factors[0]
Definition: cwb_eced.C:27
char output_dir[512]
Definition: test_config1.C:146
CWB_STAGE
Definition: cwb.hh:122
exit(0)