#!/usr/bin/env python # # Program : ES3ingest.py # Author : Peter Slaughter # Purpose : Ingest a foreign provenance file into ES3. This task is part of the IPAW Second Provenance Challenge which # challenges IPAW SPC team members to ingest provenance data from other teams. # Date : April, 2007 # import sys, string import re import commands import os import socket import getopt import time #TODO: fold this into EIL_XML: from XMLutils import XMLdoc import ES3clientLib #logFile = '%sES3ingest.log' % (LOG_DIR) noop = 0 global localIdMap global UUIDmap localIdMap = {} UUIDmap = {} class provenanceObject: def __init__(self, localId): self.localId = localId self.commandLine = '' self.userXML = '' self.filePath = '' self.links = [] self.objectName = '' self.objectType = '' self.pid = '' self.uuid = '' self.workflowUuid = '' def list(self): if (self.objectType == "PROC"): print "objectType: ", self.objectType print "commandLine: ", self.commandLine print "links: ", self.links print "userXML: ", self.userXML print "objectname: ", self.objectName print "pid: ", self.pid print "localId: ", self.localId print "uuid: ", self.uuid print "workflowUuid: ", self.workflowUuid elif (self.objectType == "FILE"): print "objectType: ", self.objectType print "file path: ", self.filePath print "links: ", self.links print "objectName", self.objectName print "localId: ", self.localId print "uuid: ", self.uuid print "workflowUuid: ", self.workflowUuid else: print "Object type unknown: ", self.objectType print "commandLine: ", self.commandLine print "file path: ", self.filePath print "links: ", self.links print "userXML: ", self.userXML print "objectname: ", self.objectName print "pid: ", self.pid print "localId: ", self.localId print "uuid: ", self.uuid print "workflowUuid: ", self.workflowUuid class moduleObject: def __init__(self, localId): self.localId = localId self.commandLine = '' self.userXML = '' self.filePath = '' self.links = [] self.objectName = '' self.objectType = '' self.pid = '' self.uuid = '' self.workflowUuid = '' # Print usage message and exit def usage(*args): sys.stdout = sys.stderr print "Usage: ES3ingest.py -t " print "" print " example:" print "" print " ES3ingest.py -t PASS challenge-D.xml" print "" print " if = ""VisTrails"", then you can also use" print "" print " -e , i.e." print "" print " example:" print "" print " ES3ingest.py -t VisTrails -e pc_log.xml pc_part3a.xml" print __doc__ sys.exit(2) # Main program: parse command line and start processin def main(): global verbose global providerName global noop try: opts, args = getopt.getopt(sys.argv[1:], 't:ve:') except getopt.error, msg: usage(msg) if (len(args) < 1): usage() sys.exit(1) #if not args: usage() inputFileFormat = '' verbose = 0 for o, a in opts: if o == '-t': inputFileFormat = a if o == '-e': executionLogFilename = a if o == '-v': verbose = verbose + 1 if o == '-n': noop = 1 print "inputFileFormat", inputFileFormat print "args: ", args inputFilename = args[0] # Process the provenance file based on the file type (i.e. PASS, VisTrails, etc) if (inputFileFormat == "PASS"): ingestPASS(inputFilename) elif (inputFileFormat == "VisTrails"): ingestVisTrails(inputFilename, executionLogFilename) else: print "Unknown input filetype: ", inputFileFormat def localIdToUUID(localId): global localIdMap # See if this localId has already been mapped to a UUID. If yes, then # reuse the existing UUID. try: uuid = localIdMap[localId] except KeyError: uuid = ES3clientLib.generateUuid() localIdMap[localId] = uuid return uuid def printLocalIds(): global localIdMap for id in localIdMap.keys(): print "localIdMap['%s'] = %s" % (id, localIdMap[id]) def UUIDtoLocalId(uuid): try: localId = UUIDmap[uuid] except KeyError: print "Error: UUID %s not found." % uuid return 0 def textToXML(text, tokenSeparator, elementSeparator, rootElementTag): # Simply format the specified text into XML tags and values. The tokenSeparator # Specifies how the text is to be parsed into tokens, i.e. '' for lines and # the element separator specifies if the tokens are to be further parsed into 'name, value' # pairs, as in the special case of each token as "name=value", i.e. "OSTYPE=NetBsd" # A string formatted as XML is returned. XMLstr = '' if (rootElementTag != ""): XMLstr = '<%s>' % rootElementTag else: XMLstr = '' if (tokenSeparator != ''): print "spliting with token" try: tokens = text.split(tokenSeparator) except: msg = "Error converting text into XML using separator \'%s\'. Text:" % (separator, text) raise RuntimeError, msg else: try: tokens = text.split() except: msg = "Error converting text into XML using separator \'%s\'. Text:" % (separator, text) raise RuntimeError, msg for t in tokens: print "token: ", t if (elementSeparator != ''): (tagName, value) = t.split(elementSeparator) else: tagName = value = t thisElement = '<%s>%s' % (tagName, value, tagName) XMLstr += '%s' % thisElement if (rootElementTag != ""): XMLstr += '' % rootElementTag else: XMLstr += '' print "XML: ", XMLstr return XMLstr def ingestPASS(inputFilename): tmp = ES3clientLib.initES3() workflowUuid = '' # Parse input provenance file. docProvenance = XMLdoc('file:%s' % inputFilename) provenanceElements = docProvenance.getNodes(u'/pass-data/provenance', '') provElemCount = 0 pNode = '' provObjs = {} linkedUuids = {} for element in provenanceElements: provElemCount += 1 # Get pnode attribute, which is the locally unique identifier for this object pNode = docProvenance.getAttrText(u'./@pnode', element) # Translate the pNode, which is a local identifier, to a UUID. The list of pNodes/UUIDs is maintained # in a global variable. provUuid = localIdToUUID(pNode) #print "pNode: %s; provUuid: %s" % (pNode, provUuid) recordElements = docProvenance.getNodes(u'./record', element) collectionName = '/IPAW-SPC/PASS2' commandLine = '' userXml = '' filePath = '' objectName = '' objectType = '' pid = '' links = [] # See if we have encounted this pnode before. If yes, then reuse the entry for it. try: pObj = provObjs[pNode] print "Reusing existing provenance object, type: %s, pnode: %s, uuid: %s" % (objectType, pNode, provUuid) except: pObj = provenanceObject(pNode) print "Creating new provenance object" pObj.uuid = provUuid # Loop through each element within this element for recElement in recordElements: recType = docProvenance.getText(u'./record-type', recElement) #print " type: ", recType # Certain record types determine which object type we are currently processing if (recType == "PID"): # If record type 'PID' is encountered then assume object type is "PROC" pObj.pid = docProvenance.getText(u'./record-data/data', recElement).strip() pObj.objectType = 'PROC' elif (recType == 'PATH'): # If record type 'FILE' is encountered then assume object type is "FILE" pObj.filePath = docProvenance.getText(u'./record-data/data', recElement).strip() # HACK: provenance records for files that are only read do not contain a 'TYPE' record (maybe a bug?) # so set objectType here manually. pObj.objectType = 'FILE' elif (recType == "TYPE"): pObj.objectType = docProvenance.getText(u'./record-data/data', recElement).strip() elif (recType == "INPUT"): # Create a link from the current pnode to the pnode specified in the 'xref' xref = docProvenance.getAttrText(u'./record-data/xref/@pnode', recElement) # Add a link for this pnode. The first number is the source pnode (the xref'd) and the second is the destination pnode (current pnode) uuid = localIdToUUID(xref) print "xref: %s; uuid: %s" % (xref, uuid) pObj.links.append({'type': 'http://eil.bren.ucsb.edu/vocabularies/es3/inputTo', 'fromUuid': '%s' % uuid, 'toUuid': '%s' % provUuid}) elif (recType == "FORKPARENT"): # Don't need "FORKPARENT" for provenance that ES3 tracks, so skip to next provenance record break elif (recType == "NAME"): pObj.objectName = docProvenance.getText(u'./record-data/data', recElement).strip() elif (recType == "ENV"): tmpStr = docProvenance.getText(u'./record-data/data', recElement).strip() if (tmpStr != ''): pObj.userXML = textToXML(tmpStr, '', '=', 'environment') else: pObj.userXML = '' elif (recType == "ARGV"): pObj.commandLine = docProvenance.getText(u'./record-data/data', recElement).strip() else: print "Unknown record type %s, skipping" % recType if (pObj.objectType == 'PROC' and pObj.objectName.lower() == "sh" and pObj.commandLine == "[nil]"): workflowUuid = localIdToUUID(pNode) print "pNode %s; workflowUuid: %s" % (pNode, workflowUuid) print "found parent pid, pnode = %s. Setting workflowUuid to %s" % (pNode, workflowUuid) provObjs[pNode] = pObj #printLocalIds() tsStart = '' for pKey in provObjs.keys(): pObj = provObjs[pKey] print "++++++++++" pObj.workflowUuid = workflowUuid # Collect souce and dest objects of all links. Objects that aren't linked to anything will be discarded for link in pObj.links: #pObj.links.append({'type': 'http://eil.bren.ucsb.edu/vocabularies/es3/inputTo', 'fromUuid': '%s' % uuid, 'toUuid': '%s' % provUuid}) fromUuid = link['fromUuid'] toUuid = link['toUuid'] # Record that the fromUuid is connected to the graph. It may be connected more that once. linkedUuids[fromUuid] = 1 # Record that the toUuid is connected to the graph. It may be connected more that once. linkedUuids[toUuid] = 1 pObj.list() print "----------\n" for pKey in provObjs.keys(): pObj = provObjs[pKey] # If this pObj is not linked to any other object (either as source or dest) then skip it. try: linked = linkedUuids[pObj.uuid] except: continue if (pObj.objectType == "PROC"): print "pObj.objectName.lower(): ", pObj.objectName.lower() print "commandLine: ", commandLine #workflowUuid = pObj.workflowUuid uuid = pObj.uuid childWorkflowUuid = "" transformationName = pObj.objectName userXML = pObj.userXML links = pObj.links commandLineArguments = pObj.commandLine retVal = ES3clientLib.storeTransformation(uuid, childWorkflowUuid, workflowUuid, collectionName, transformationName, userXML, commandLineArguments, links, tsStart) print "storeTransformation status= ", retVal elif (pObj.objectType == "FILE"): fn = str(pObj.filePath) name = str(pObj.objectName) # One filename may be associated with several pnodes, each of which may have a different 'version' attribute. I still don't understand why they do this. # The different versions may have different mixes of elements, i.e. version 0 may have filepath, version 2 may have name and 'input' links. If a filepath # isn't included with a file object, then use the 'name' element as the filepath. if (fn.strip() == ''): if (name.strip() == ''): msg = "Neither filepath nor name found for pnode: %s" % pObj.pnode raise RuntimeError, msg else: fn = name URL = '' uuid = pObj.uuid calcMd5sum = 0 links = pObj.links status = ES3clientLib.registerFile(fn, URL, uuid, calcMd5sum, workflowUuid, links) print "status: ", status else: msg = "Unknown object type %s, skipping." % pObj.objectType def ingestVisTrails(workflowFilename, executionLogFilename): print "workflowFilename: ", workflowFilename print "executionLogFilename: ", executionLogFilename workflowUuid = ES3clientLib.initES3() collectionName = '/IPAW-SPC/VisTrails2' # Parse input provenance file. docWorkflow = XMLdoc('file:%s' % workflowFilename) docExecLog = XMLdoc('file:%s' % executionLogFilename) workflowId = docWorkflow.getAttrText(u'/workflow/@id', '') print "workflowId: ", workflowId workflowElements = docWorkflow.getNodes(u'/workflow/*', '') moduleElemCount = 0 moduleObjs = {} for element in workflowElements: moduleElemCount += 1 uuid = '' childWorkflowUuid = '' transformationName = '' userXML = '' commandLineArguments = '' calcMd5sum = 0 links = [] tsStart = '' # Sub-elements of are either or if (element.nodeName == "module"): # See if we have encounted this id before. If yes, then reuse the entry for it. id = docWorkflow.getAttrText(u'./@id', element) uuid = localIdToUUID(id) print "module id: ", id # Read the module start time from the workflow execution file. #path = u"/log/session/wfExec[@wfVersion='%s']/exec[@moduleId='%s']/@tsStart" % (str(workflowId), str(id)) #tsStart = docExecLog.getAttrText(u"/log/session/wfExec[@wfVersion='%s']/exec[@moduleId='%s']/@tsStart" % (str(workflowId), str(id)), '') tsStart = docExecLog.getAttrText(u"/log/session/wfExec[@wfVersion='%s']/exec[@moduleId='%s']/@tsStart" % (str(workflowId), str(id)), '') #tsStart = docExecLog.getAttrText(u"/log/session/wfExec[@wfVersion='120']/@tsStart", '') print "tsStart: ", tsStart moduleElementName = docWorkflow.getAttrText(u'./@name', element) # "List" module elements just contain display info, so skip them. if (moduleElementName == "FileSink"): objectType = 'FILE' # FileSink corresponds to an ES3 file object print "processing %s module" % moduleElementName moduleElements = docWorkflow.getNodes(u'./*', element) # elements have sub-elements of , , , for mEl in moduleElements: if (mEl.nodeName == "function"): funcName = docWorkflow.getAttrText(u'./@name', mEl) if (funcName == "outputName"): funcElements = docWorkflow.getNodes(u'./*', mEl) for funcEl in funcElements: if (funcEl.nodeName == "parameter"): paramVal = docWorkflow.getAttrText(u'./@val', funcEl) print "parameter value: ", paramVal fn = paramVal URL = '' retVal = ES3clientLib.registerFile(fn, URL, uuid, calcMd5sum, workflowUuid, links) print "registerFile status: ", retVal # The location element seems to be display info elif (mEl.nodeName == "location"): continue else: msg = "Unknown module type: ", mEl.nodeName raise RuntimeError, msg elif (moduleElementName == "FileSet"): objectType = 'TRANSFORMATION' # FileSet corresponds to an ES3 file object print "processing %s module" % moduleElementName moduleElements = docWorkflow.getNodes(u'./*', element) # elements have sub-elements of , , , for mEl in moduleElements: if (mEl.nodeName == "function"): funcElements = docWorkflow.getNodes(u'./*', mEl) for funcEl in funcElements: funcName = docWorkflow.getAttrText(u'./@name', funcEl) # has sub-element if (funcEl.nodeName == "parameter"): paramVal = docWorkflow.getAttrText(u'./@val', funcEl) print "parameter value: ", paramVal transformationName = paramVal retVal = ES3clientLib.storeTransformation(uuid, childWorkflowUuid, workflowUuid, collectionName, transformationName, userXML, commandLineArguments, links, tsStart) print "Storing transformation: ", retVal pass # The location element seems to be display info elif (mEl.nodeName == "location"): continue else: msg = "Unknown module type: ", mEl.nodeName raise RuntimeError, msg elif (moduleElementName == "FileSetSink"): objectType = "FILE" # Fileset corresponds to an ES3 transformation object moduleElements = docWorkflow.getNodes(u'./*', element) # elements have sub-elements of , , , for mEl in moduleElements: if (mEl.nodeName == "function"): funcElements = docWorkflow.getNodes(u'./*', mEl) for funcEl in funcElements: funcName = docWorkflow.getAttrText(u'./@name', funcEl) # has sub-element if (funcEl.nodeName == "parameter"): paramVal = docWorkflow.getAttrText(u'./@val', funcEl) print "parameter value: ", paramVal transformationName = paramVal retVal = ES3clientLib.storeTransformation(uuid, childWorkflowUuid, workflowUuid, collectionName, transformationName, userXML, commandLineArguments, links, tsStart) print "storeTransformation status= ", retVal elif (moduleElementName == "List"): objectType = "UNKNOWN" pass else: objectType = "TRANSFORMATION" # Fileset corresponds to an ES3 transformation object #moduleElements = docWorkflow.getNodes(u'./*', element) # elements have sub-elements of , , , #for mEl in moduleElements: # if (mEl.nodeName == "function"): # funcElements = docWorkflow.getNodes(u'./*', mEl) # for funcEl in funcElements: # funcName = docWorkflow.getAttrText(u'./@name', funcEl) # # has sub-element # if (funcEl.nodeName == "parameter"): # paramVal = docWorkflow.getAttrText(u'./@val', funcEl) # print "parameter value: ", paramVal # transformationName = moduleElementName # retVal = ES3clientLib.storeTransformation(uuid, childWorkflowUuid, workflowUuid, collectionName, transformationName, userXML, commandLineArguments, links, tsStart) # print "storeTransformation status= ", retVal transformationName = moduleElementName retVal = ES3clientLib.storeTransformation(uuid, childWorkflowUuid, workflowUuid, collectionName, transformationName, userXML, commandLineArguments, links, tsStart) pass elif (element.nodeName == "connection"): # A element contains a single link objectType = "LINK" portElements = docWorkflow.getNodes(u'./port', element) # elements have sub-elements of , , , for pEl in portElements: id = docWorkflow.getAttrText(u'./@id', pEl) moduleName = docWorkflow.getAttrText(u'./@moduleName', pEl) moduleId = docWorkflow.getAttrText(u'./@moduleId', pEl) portType = docWorkflow.getAttrText(u'./@type', pEl) #print "Port type: ", portType if (portType == "source"): sourceUuid = localIdToUUID(moduleId) elif (portType == "destination"): destinationUuid = localIdToUUID(moduleId) else: msg = "Port type unknow for port #: ", id raise RuntimeError, msg print "source Uuid ", sourceUuid print "destination Uuid : ", destinationUuid identity = 0 links.append({'type': 'http://eil.bren.ucsb.edu/vocabularies/es3/inputTo', 'fromUuid': '%s' % sourceUuid, 'toUuid': '%s' % destinationUuid, 'identity' : 0}) # Send lineage info to ES3 retVal = ES3clientLib.storeLineage(workflowUuid, links) print "storeLineage return status: ", retVal for key in localIdMap.keys(): uuid = localIdMap[key] print 'localId: %s ; uuid: %s' % (key, uuid) if __name__ == '__main__': main()