I have been looking into the Oozie code base trying to understand its underlying architecture.
Here are some of my findings:
There are different Services in the Oozie web app. Among them are ActionService, AuthorizationService, CoordinatorStoreService, DagEngineService, SchemaService, UserGroupInformationService, etc. They are in the org.apache.oozie.service package. These services implemented the Service interface. See the Service interface below.
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
/**
* A service is component managed by the {@link Services} singleton.
*/
public interface Service {
public static final String DEFAULT_LOCK_TIMEOUT = "oozie.service.default.lock.timeout";
/**
* Prefix for all services configuration properties.
*/
public static final String CONF_PREFIX = "oozie.service.";
/**
* Constant for XCommand
*/
public static final String USE_XCOMMAND = "oozie.useXCommand";
/**
* Initialize the service. <p/> Invoked by the {@link Service} singleton at start up time.
*
* @param services services singleton initializing the service.
* @throws ServiceException thrown if the service could not initialize.
*/
public void init(Services services) throws ServiceException;
/**
* Destroy the service. <p/> Invoked by the {@link Service} singleton at shutdown time.
*/
public void destroy();
/**
* Return the public interface of the service. <p/> Services are retrieved by its public interface. Specializations
* of services must return the public interface.
*
* @return the interface of the service.
*/
public Class<? extends Service> getInterface();
/**
* Lock timeout value if service is only allowed to have one single running instance.
*/
public static long lockTimeout = Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
}
The way Oozie makes use of these services is mostly through the app.getService call.
Incoming requests are handled by different servlets which would invoke different services to get thing done.
Also, there are different executors. Each extends the abstract ActionExecutor class. Among them are EmailActionExecutor, HiveActionExecutor.
There is another kind of Executor, JPAExecutor which implements a different interface.
They are used in JPAService. JPAService is the basic JPA service which provides database execution service, eg. execute the select query, update query, or JPAExecutor operation. OpenJPA is used in the ORM layer.
The standard ORM entity beans are used to represent DB entity object with named queries.
For example, WorkflowJobBean as shown below.
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie;
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.JsonUtils;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.IOException;
import java.io.DataOutput;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.persistence.Entity;
import javax.persistence.Column;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Basic;
import javax.persistence.Lob;
import javax.persistence.Table;
import javax.persistence.Transient;
import java.sql.Timestamp;
import org.apache.openjpa.persistence.jdbc.Index;
import org.apache.openjpa.persistence.jdbc.Strategy;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
@Entity
@NamedQueries({
@NamedQuery(name = "UPDATE_WORKFLOW", query = "update WorkflowJobBean w set w.appName = :appName, w.appPath = :appPath, w.conf = :conf, w.group = :groupName, w.run = :run, w.user = :user, w.createdTimestamp = :createdTime, w.endTimestamp = :endTime, w.externalId = :externalId, w.lastModifiedTimestamp = :lastModTime,w.logToken = :logToken, w.protoActionConf = :protoActionConf, w.slaXml =:slaXml, w.startTimestamp = :startTime, w.statusStr = :status, w.wfInstance = :wfInstance where w.id = :id"),
@NamedQuery(name = "UPDATE_WORKFLOW_MODTIME", query = "update WorkflowJobBean w set w.lastModifiedTimestamp = :lastModTime where w.id = :id"),
@NamedQuery(name = "UPDATE_WORKFLOW_STATUS_MODTIME", query = "update WorkflowJobBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),
@NamedQuery(name = "UPDATE_WORKFLOW_PARENT_MODIFIED", query = "update WorkflowJobBean w set w.parentId = :parentId, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),
@NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),
@NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime, w.endTimestamp = :endTime where w.id = :id"),
@NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime, w.startTimestamp = :startTime, w.endTimestamp = :endTime where w.id = :id"),
@NamedQuery(name = "UPDATE_WORKFLOW_RERUN", query = "update WorkflowJobBean w set w.appName = :appName, w.protoActionConf = :protoActionConf, w.appPath = :appPath, w.conf = :conf, w.logToken = :logToken, w.user = :user, w.group = :group, w.externalId = :externalId, w.endTimestamp = :endTime, w.run = :run, w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),
@NamedQuery(name = "DELETE_WORKFLOW", query = "delete from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOWS", query = "select OBJECT(w) from WorkflowJobBean w order by w.startTimestamp desc"),
@NamedQuery(name = "GET_WORKFLOWS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
+ "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId from WorkflowJobBean w order by w.createdTimestamp desc"),
@NamedQuery(name = "GET_WORKFLOWS_COUNT", query = "select count(w) from WorkflowJobBean w"),
@NamedQuery(name = "GET_COMPLETED_WORKFLOWS_OLDER_THAN", query = "select w from WorkflowJobBean w where w.endTimestamp < :endTime"),
@NamedQuery(name = "GET_COMPLETED_WORKFLOWS_WITH_NO_PARENT_OLDER_THAN", query = "select w.id from WorkflowJobBean w where w.endTimestamp < :endTime and w.parentId is null"),
@NamedQuery(name = "GET_WORKFLOW", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_STARTTIME", query = "select w.id, w.startTimestamp from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_USER_GROUP", query = "select w.user, w.group from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_SUSPEND", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_RERUN", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.run, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_DEFINITION", query = "select w.id, w.user, w.group, w.appName, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_ACTION_OP", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_KILL", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.slaXml from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_RESUME", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_FOR_UPDATE", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_FOR_SLA", query = "select w.id, w.statusStr, w.startTimestamp, w.endTimestamp from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_ID_FOR_EXTERNAL_ID", query = "select w.id from WorkflowJobBean w where w.externalId = :externalId"),
@NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS", query = "select count(w) from WorkflowJobBean w where w.statusStr = :status"),
@NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS", query = "select count(w) from WorkflowJobBean w where w.statusStr = :status and w.lastModifiedTimestamp > :lastModTime"),
@NamedQuery(name = "GET_WORKFLOWS_WITH_WORKFLOW_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId = :parentId"),
@NamedQuery(name = "GET_WORKFLOWS_WITH_COORD_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId like :parentId"), // when setting parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%")
@NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' OR w.endTimestamp >= :endTime)"),
@NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId like :parentId and (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' OR w.endTimestamp >= :endTime)"), // when setting parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%")
@NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_STATUS", query = "select w.statusStr from WorkflowJobBean w where w.id = :id")
})
@Table(name = "WF_JOBS")
public class WorkflowJobBean implements Writable, WorkflowJob, JsonBean {
@Id
private String id;
@Basic
@Column(name = "proto_action_conf")
@Lob
@Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
private StringBlob protoActionConf;
@Basic
@Column(name = "log_token")
private String logToken = null;
@Basic
@Index
@Column(name = "external_id")
private String externalId = null;
@Basic
@Index
@Column(name = "status")
private String statusStr = WorkflowJob.Status.PREP.toString();
@Basic
@Column(name = "created_time")
private java.sql.Timestamp createdTimestamp = null;
@Basic
@Column(name = "start_time")
private java.sql.Timestamp startTimestamp = null;
@Basic
@Index
@Column(name = "end_time")
private java.sql.Timestamp endTimestamp = null;
@Basic
@Index
@Column(name = "last_modified_time")
private java.sql.Timestamp lastModifiedTimestamp = null;
@Basic
@Column(name = "wf_instance")
@Lob
@Strategy("org.apache.oozie.executor.jpa.BinaryBlobValueHandler")
private BinaryBlob wfInstance ;
@Basic
@Column(name = "sla_xml")
@Lob
@Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
private StringBlob slaXml;
@Basic
@Column(name = "app_name")
private String appName = null;
@Basic
@Column(name = "app_path")
private String appPath = null;
@Basic
@Column(name = "conf")
@Lob
@Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
private StringBlob conf;
@Basic
@Column(name = "user_name")
private String user = null;
@Basic
@Column(name = "group_name")
private String group;
@Basic
@Column(name = "run")
private int run = 1;
@Basic
@Index
@Column(name = "parent_id")
private String parentId;
@Transient
private String consoleUrl;
@Transient
private List<WorkflowActionBean> actions;
/**
* Default constructor.
*/
public WorkflowJobBean() {
actions = new ArrayList<WorkflowActionBean>();
}
/**
* Serialize the workflow bean to a data output.
*
* @param dataOutput data output.
* @throws IOException thrown if the workflow bean could not be serialized.
*/
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeStr(dataOutput, getAppPath());
WritableUtils.writeStr(dataOutput, getAppName());
WritableUtils.writeStr(dataOutput, getId());
WritableUtils.writeStr(dataOutput, getParentId());
WritableUtils.writeStr(dataOutput, getConf());
WritableUtils.writeStr(dataOutput, getStatusStr());
dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
WritableUtils.writeStr(dataOutput, getUser());
WritableUtils.writeStr(dataOutput, getGroup());
dataOutput.writeInt(getRun());
WritableUtils.writeStr(dataOutput, logToken);
WritableUtils.writeStr(dataOutput, getProtoActionConf());
}
/**
* Deserialize a workflow bean from a data input.
*
* @param dataInput data input.
* @throws IOException thrown if the workflow bean could not be deserialized.
*/
public void readFields(DataInput dataInput) throws IOException {
setAppPath(WritableUtils.readStr(dataInput));
setAppName(WritableUtils.readStr(dataInput));
setId(WritableUtils.readStr(dataInput));
setParentId(WritableUtils.readStr(dataInput));
setConf(WritableUtils.readStr(dataInput));
setStatus(WorkflowJob.Status.valueOf(WritableUtils.readStr(dataInput)));
// setStatus(WritableUtils.readStr(dataInput));
long d = dataInput.readLong();
if (d != -1) {
setCreatedTime(new Date(d));
}
d = dataInput.readLong();
if (d != -1) {
}
setStartTime(new Date(d));
d = dataInput.readLong();
if (d != -1) {
setLastModifiedTime(new Date(d));
}
d = dataInput.readLong();
if (d != -1) {
setEndTime(new Date(d));
}
setUser(WritableUtils.readStr(dataInput));
setGroup(WritableUtils.readStr(dataInput));
setRun(dataInput.readInt());
logToken = WritableUtils.readStr(dataInput);
setProtoActionConf(WritableUtils.readStr(dataInput));
setExternalId(getExternalId());
}
public boolean inTerminalState() {
boolean inTerminalState = false;
switch (WorkflowJob.Status.valueOf(statusStr)) {
case FAILED:
case KILLED:
case SUCCEEDED:
inTerminalState = true;
break;
default:
break;
}
return inTerminalState;
}
public String getLogToken() {
return logToken;
}
public void setLogToken(String logToken) {
this.logToken = logToken;
}
public String getSlaXml() {
return slaXml == null ? null : slaXml.getString();
}
public void setSlaXml(String slaXml) {
if (this.slaXml == null) {
this.slaXml = new StringBlob(slaXml);
}
else {
this.slaXml.setString(slaXml);
}
}
public void setSlaXmlBlob(StringBlob slaXml) {
this.slaXml = slaXml;
}
public StringBlob getSlaXmlBlob() {
return this.slaXml;
}
public WorkflowInstance getWorkflowInstance() {
return wfInstance == null ? null : get(wfInstance.getBytes());
}
public BinaryBlob getWfInstanceBlob() {
return this.wfInstance;
}
public void setWorkflowInstance(WorkflowInstance workflowInstance) {
if (this.wfInstance == null) {
this.wfInstance = new BinaryBlob(WritableUtils.toByteArray((LiteWorkflowInstance) workflowInstance), true);
}
else {
this.wfInstance.setBytes(WritableUtils.toByteArray((LiteWorkflowInstance) workflowInstance));
}
}
public void setWfInstanceBlob(BinaryBlob wfInstance) {
this.wfInstance = wfInstance;
}
public String getProtoActionConf() {
return protoActionConf == null ? null : protoActionConf.getString();
}
public void setProtoActionConf(String protoActionConf) {
if (this.protoActionConf == null) {
this.protoActionConf = new StringBlob(protoActionConf);
}
else {
this.protoActionConf.setString(protoActionConf);
}
}
public void setProtoActionConfBlob (StringBlob protoBytes) {
this.protoActionConf = protoBytes;
}
public StringBlob getProtoActionConfBlob() {
return this.protoActionConf;
}
public String getlogToken() {
return logToken;
}
public Timestamp getLastModifiedTimestamp() {
return lastModifiedTimestamp;
}
public Timestamp getStartTimestamp() {
return startTimestamp;
}
public Timestamp getCreatedTimestamp() {
return createdTimestamp;
}
public Timestamp getEndTimestamp() {
return endTimestamp;
}
public void setStatusStr (String statusStr) {
this.statusStr = statusStr;
}
public void setStatus(Status val) {
this.statusStr = val.toString();
}
@Override
public Status getStatus() {
return Status.valueOf(statusStr);
}
public String getStatusStr() {
return statusStr;
}
public void setExternalId(String externalId) {
this.externalId = externalId;
}
@Override
public String getExternalId() {
return externalId;
}
public void setLastModifiedTime(Date lastModifiedTime) {
this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
}
public Date getLastModifiedTime() {
return DateUtils.toDate(lastModifiedTimestamp);
}
public Date getCreatedTime() {
return DateUtils.toDate(createdTimestamp);
}
public void setCreatedTime(Date createdTime) {
this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
}
@Override
public Date getStartTime() {
return DateUtils.toDate(startTimestamp);
}
public void setStartTime(Date startTime) {
this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
}
public Date getEndTime() {
return DateUtils.toDate(endTimestamp);
}
public void setEndTime(Date endTime) {
this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
}
private WorkflowInstance get(byte[] array) {
LiteWorkflowInstance pInstance = WritableUtils.fromByteArray(array, LiteWorkflowInstance.class);
return pInstance;
}
@SuppressWarnings("unchecked")
public JSONObject toJSONObject() {
return toJSONObject("GMT");
}
@SuppressWarnings("unchecked")
public JSONObject toJSONObject(String timeZoneId) {
JSONObject json = new JSONObject();
json.put(JsonTags.WORKFLOW_APP_PATH, getAppPath());
json.put(JsonTags.WORKFLOW_APP_NAME, getAppName());
json.put(JsonTags.WORKFLOW_ID, getId());
json.put(JsonTags.WORKFLOW_EXTERNAL_ID, getExternalId());
json.put(JsonTags.WORKFLOW_PARENT_ID, getParentId());
json.put(JsonTags.WORKFLOW_CONF, getConf());
json.put(JsonTags.WORKFLOW_STATUS, getStatus().toString());
json.put(JsonTags.WORKFLOW_LAST_MOD_TIME, JsonUtils.formatDateRfc822(getLastModifiedTime(), timeZoneId));
json.put(JsonTags.WORKFLOW_CREATED_TIME, JsonUtils.formatDateRfc822(getCreatedTime(), timeZoneId));
json.put(JsonTags.WORKFLOW_START_TIME, JsonUtils.formatDateRfc822(getStartTime(), timeZoneId));
json.put(JsonTags.WORKFLOW_END_TIME, JsonUtils.formatDateRfc822(getEndTime(), timeZoneId));
json.put(JsonTags.WORKFLOW_USER, getUser());
json.put(JsonTags.WORKFLOW_GROUP, getGroup());
json.put(JsonTags.WORKFLOW_ACL, getAcl());
json.put(JsonTags.WORKFLOW_RUN, (long) getRun());
json.put(JsonTags.WORKFLOW_CONSOLE_URL, getConsoleUrl());
json.put(JsonTags.WORKFLOW_ACTIONS, WorkflowActionBean.toJSONArray(actions, timeZoneId));
json.put(JsonTags.TO_STRING, toString());
return json;
}
public String getAppPath() {
return appPath;
}
public void setAppPath(String appPath) {
this.appPath = appPath;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getConf() {
return conf == null ? null : conf.getString();
}
public void setConf(String conf) {
if (this.conf == null) {
this.conf = new StringBlob(conf);
}
else {
this.conf.setString(conf);
}
}
public void setConfBlob(StringBlob conf) {
this.conf = conf;
}
public StringBlob getConfBlob() {
return this.conf;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getGroup() {
return group;
}
@Override
public String getAcl() {
return getGroup();
}
public void setGroup(String group) {
this.group = group;
}
public int getRun() {
return run;
}
public void setRun(int run) {
this.run = run;
}
/**
* Return the workflow job console URL.
*
* @return the workflow job console URL.
*/
public String getConsoleUrl() {
return consoleUrl;
}
/**
* Return the corresponding Action ID, if any.
*
* @return the coordinator Action Id.
*/
public String getParentId() {
return parentId;
}
/**
* Set coordinator action id
*
* @param parentId : coordinator action id
*/
public void setParentId(String parentId) {
this.parentId = parentId;
}
/**
* Set the workflow job console URL.
*
* @param consoleUrl the workflow job console URL.
*/
public void setConsoleUrl(String consoleUrl) {
this.consoleUrl = consoleUrl;
}
@SuppressWarnings("unchecked")
public List<WorkflowAction> getActions() {
return (List) actions;
}
public void setActions(List<WorkflowActionBean> nodes) {
this.actions = (nodes != null) ? nodes : new ArrayList<WorkflowActionBean>();
}
@Override
public String toString() {
return MessageFormat.format("Workflow id[{0}] status[{1}]", getId(), getStatus());
}
/**
* Convert a workflows list into a JSONArray.
*
* @param workflows workflows list.
* @param timeZoneId time zone to use for dates in the JSON array.
* @return the corresponding JSON array.
*/
@SuppressWarnings("unchecked")
public static JSONArray toJSONArray(List<WorkflowJobBean> workflows, String timeZoneId) {
JSONArray array = new JSONArray();
if (workflows != null) {
for (WorkflowJobBean node : workflows) {
array.add(node.toJSONObject(timeZoneId));
}
}
return array;
}
}
If you are interested in the relational table schema used in Oozie, you can take a look at the OozieSchema in org.apache.oozie.store package which encapsulates the schema in java code
Oozie has the following DB tables
Process Instance Table, WorkflowJob Table, Actions Table, Version Table
// Process Instance Table
PI_wfId(OozieTable.WF_PROCESS_INSTANCE, String.class, true, 100),
PI_state(OozieTable.WF_PROCESS_INSTANCE, Blob.class, false),
// WorkflowJob Table
WF_id(OozieTable.WORKFLOWS, String.class, true, 100),
WF_externalId(OozieTable.WORKFLOWS, String.class, false, 100),
WF_appName(OozieTable.WORKFLOWS, String.class, false, 100),
WF_appPath(OozieTable.WORKFLOWS, String.class, false, 255),
WF_conf(OozieTable.WORKFLOWS, String.class, false),
WF_protoActionConf(OozieTable.WORKFLOWS, String.class, false),
WF_logToken(OozieTable.WORKFLOWS, String.class, false, 100),
WF_status(OozieTable.WORKFLOWS, String.class, false, 100),
WF_run(OozieTable.WORKFLOWS, Long.class, false),
WF_lastModTime(OozieTable.WORKFLOWS, Timestamp.class, false),
WF_createdTime(OozieTable.WORKFLOWS, Timestamp.class, false),
WF_startTime(OozieTable.WORKFLOWS, Timestamp.class, false),
WF_endTime(OozieTable.WORKFLOWS, Timestamp.class, false),
WF_userName(OozieTable.WORKFLOWS, String.class, false, 100),
WF_groupName(OozieTable.WORKFLOWS, String.class, false, 100),
WF_authToken(OozieTable.WORKFLOWS, String.class, false),
// Actions Table
ACTIONS_id(OozieTable.ACTIONS, String.class, true, 100),
ACTIONS_name(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_type(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_wfId(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_conf(OozieTable.ACTIONS, String.class, false),
ACTIONS_status(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_externalStatus(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_errorCode(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_errorMessage(OozieTable.ACTIONS, String.class, false),
ACTIONS_transition(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_retries(OozieTable.ACTIONS, Long.class, false),
ACTIONS_startTime(OozieTable.ACTIONS, Timestamp.class, false),
ACTIONS_endTime(OozieTable.ACTIONS, Timestamp.class, false),
ACTIONS_lastCheckTime(OozieTable.ACTIONS, Timestamp.class, false),
ACTIONS_data(OozieTable.ACTIONS, String.class, false),
ACTIONS_externalId(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_trackerUri(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_consoleUrl(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_executionPath(OozieTable.ACTIONS, String.class, false, 255),
ACTIONS_pending(OozieTable.ACTIONS, Boolean.class, false),
ACTIONS_pendingAge(OozieTable.ACTIONS, Timestamp.class, false),
ACTIONS_signalValue(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_logToken(OozieTable.ACTIONS, String.class, false, 100),
// Version Table
VER_versionNumber(OozieTable.VERSION, String.class, false, 255);
OozieSchema which defines the DB tables used in Oozie.
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.store;
import java.sql.Blob;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.oozie.util.db.Schema;
import org.apache.oozie.util.db.Schema.Column;
import org.apache.oozie.util.db.Schema.DBType;
import org.apache.oozie.util.db.Schema.Index;
import org.apache.oozie.util.db.Schema.Table;
public class OozieSchema {
private static String oozieDbName;
private static final String OOZIE_VERSION = "0.1";
public static final Map<Table, List<Column>> TABLE_COLUMNS = new HashMap<Table, List<Column>>();
static {
for (Column column : OozieColumn.values()) {
List<Column> tColumns = TABLE_COLUMNS.get(column.table());
if (tColumns == null) {
tColumns = new ArrayList<Column>();
TABLE_COLUMNS.put(column.table(), tColumns);
}
tColumns.add(column);
}
}
public static void setOozieDbName(String dbName) {
oozieDbName = dbName;
}
public static enum OozieTable implements Table {
WORKFLOWS,
ACTIONS,
WF_PROCESS_INSTANCE,
VERSION;
@Override
public String toString() {
return oozieDbName + "." + name().toUpperCase();
}
}
public static enum OozieIndex implements Index {
IDX_WF_APPNAME(OozieColumn.WF_appName),
IDX_WF_USER(OozieColumn.WF_userName),
IDX_WF_GROUP(OozieColumn.WF_groupName),
IDX_WF_STATUS(OozieColumn.WF_status),
IDX_WF_EXTERNAL_ID(OozieColumn.WF_externalId),
IDX_ACTIONS_BEGINTIME(OozieColumn.ACTIONS_pendingAge),
IDX_ACTIONS_WFID(OozieColumn.ACTIONS_wfId);
final Column column;
OozieIndex(Column column) {
this.column = column;
}
public Column column() {
return column;
}
}
public static enum OozieColumn implements Column {
// Process Instance Table
PI_wfId(OozieTable.WF_PROCESS_INSTANCE, String.class, true, 100),
PI_state(OozieTable.WF_PROCESS_INSTANCE, Blob.class, false),
// WorkflowJob Table
WF_id(OozieTable.WORKFLOWS, String.class, true, 100),
WF_externalId(OozieTable.WORKFLOWS, String.class, false, 100),
WF_appName(OozieTable.WORKFLOWS, String.class, false, 100),
WF_appPath(OozieTable.WORKFLOWS, String.class, false, 255),
WF_conf(OozieTable.WORKFLOWS, String.class, false),
WF_protoActionConf(OozieTable.WORKFLOWS, String.class, false),
WF_logToken(OozieTable.WORKFLOWS, String.class, false, 100),
WF_status(OozieTable.WORKFLOWS, String.class, false, 100),
WF_run(OozieTable.WORKFLOWS, Long.class, false),
WF_lastModTime(OozieTable.WORKFLOWS, Timestamp.class, false),
WF_createdTime(OozieTable.WORKFLOWS, Timestamp.class, false),
WF_startTime(OozieTable.WORKFLOWS, Timestamp.class, false),
WF_endTime(OozieTable.WORKFLOWS, Timestamp.class, false),
WF_userName(OozieTable.WORKFLOWS, String.class, false, 100),
WF_groupName(OozieTable.WORKFLOWS, String.class, false, 100),
WF_authToken(OozieTable.WORKFLOWS, String.class, false),
// Actions Table
ACTIONS_id(OozieTable.ACTIONS, String.class, true, 100),
ACTIONS_name(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_type(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_wfId(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_conf(OozieTable.ACTIONS, String.class, false),
ACTIONS_status(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_externalStatus(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_errorCode(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_errorMessage(OozieTable.ACTIONS, String.class, false),
ACTIONS_transition(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_retries(OozieTable.ACTIONS, Long.class, false),
ACTIONS_startTime(OozieTable.ACTIONS, Timestamp.class, false),
ACTIONS_endTime(OozieTable.ACTIONS, Timestamp.class, false),
ACTIONS_lastCheckTime(OozieTable.ACTIONS, Timestamp.class, false),
ACTIONS_data(OozieTable.ACTIONS, String.class, false),
ACTIONS_externalId(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_trackerUri(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_consoleUrl(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_executionPath(OozieTable.ACTIONS, String.class, false, 255),
ACTIONS_pending(OozieTable.ACTIONS, Boolean.class, false),
ACTIONS_pendingAge(OozieTable.ACTIONS, Timestamp.class, false),
ACTIONS_signalValue(OozieTable.ACTIONS, String.class, false, 100),
ACTIONS_logToken(OozieTable.ACTIONS, String.class, false, 100),
// Version Table
VER_versionNumber(OozieTable.VERSION, String.class, false, 255);
final Table table;
final Class<?> type;
int length = -1;
final boolean isPrimaryKey;
OozieColumn(Table table, Class<?> type, boolean isPrimaryKey) {
this(table, type, isPrimaryKey, -1);
}
OozieColumn(Table table, Class<?> type, boolean isPrimaryKey, int length) {
this.table = table;
this.type = type;
this.isPrimaryKey = isPrimaryKey;
this.length = length;
}
private String getName() {
String tName = table.name();
return tName + "." + columnName();
}
public String columnName() {
return name().split("_")[1].toLowerCase();
}
@Override
public String toString() {
return getName();
}
public Table table() {
return table;
}
public Class<?> getType() {
return type;
}
public int getLength() {
return length;
}
public String asLabel() {
return name().toUpperCase();
}
public boolean isPrimaryKey() {
return isPrimaryKey;
}
}
/**
* Generates the create table SQL Statement
*
* @param table
* @param dbType
* @return SQL Statement to create the table
*/
public static String generateCreateTableScript(Table table, DBType dbType) {
return Schema.generateCreateTableScript(table, dbType, TABLE_COLUMNS.get(table));
}
/**
* Gets the query that will be used to validate the connection
*
* @param dbName
* @return
*/
public static String getValidationQuery(String dbName) {
return "select count(" + OozieColumn.VER_versionNumber.columnName() + ") from " + dbName + "."
+ OozieTable.VERSION.name().toUpperCase();
}
/**
* Generates the Insert statement to insert the OOZIE_VERSION to table
*
* @param dbName
* @return
*/
public static String generateInsertVersionScript(String dbName) {
return "INSERT INTO " + dbName + "." + OozieTable.VERSION.name().toUpperCase() + "("
+ OozieColumn.VER_versionNumber.columnName() + ") VALUES(" + OOZIE_VERSION + ")";
}
/**
* Gets the Oozie Schema Version
*
* @return
*/
public static String getOozieVersion() {
return OOZIE_VERSION;
}
}
In Oozie, there are different DB stores, eg. Coord Store, Workflow Store, SLA Store
They are in the package org.apache.oozie.store
Store is the abstract class to separate Entities from Actual store implementation
It allows one to get the EntityManager, getConnection, perform transaction control and rollback
The DB implementations are
CoordinatorStore, DB Implementation of Coord Store
WorkflowStore, DB Implementation of Workflow Store
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.store;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.WorkflowsInfo;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob.Status;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.SchemaService.SchemaName;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.openjpa.persistence.OpenJPAEntityManager;
import org.apache.openjpa.persistence.OpenJPAPersistence;
import org.apache.openjpa.persistence.OpenJPAQuery;
import org.apache.openjpa.persistence.jdbc.FetchDirection;
import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
import org.apache.openjpa.persistence.jdbc.ResultSetType;
/**
* DB Implementation of Workflow Store
*/
public class WorkflowStore extends Store {
private Connection conn;
private EntityManager entityManager;
private boolean selectForUpdate;
private static final String INSTR_GROUP = "db";
public static final int LOCK_TIMEOUT = 50000;
private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
+ "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
private static final String countStr = "Select count(w) from WorkflowJobBean w";
public WorkflowStore() {
}
public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException {
super();
conn = ParamChecker.notNull(connection, "conn");
entityManager = getEntityManager();
this.selectForUpdate = selectForUpdate;
}
public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException {
super(store);
conn = ParamChecker.notNull(connection, "conn");
entityManager = getEntityManager();
this.selectForUpdate = selectForUpdate;
}
public WorkflowStore(boolean selectForUpdate) throws StoreException {
super();
entityManager = getEntityManager();
javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager);
conn = (Connection) kem.getConnection();
this.selectForUpdate = selectForUpdate;
}
public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException {
super(store);
entityManager = getEntityManager();
this.selectForUpdate = selectForUpdate;
}
/**
* Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job.
*
* @param workflow workflow bean
* @throws StoreException
*/
public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException {
ParamChecker.notNull(workflow, "workflow");
doOperation("insertWorkflow", new Callable<Void>() {
public Void call() throws SQLException, StoreException, WorkflowException {
entityManager.persist(workflow);
return null;
}
});
}
/**
* Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow
* depending on the locking parameter.
*
* @param id Workflow ID
* @param locking true if Workflow is to be locked
* @return WorkflowJobBean
* @throws StoreException
*/
public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException {
ParamChecker.notEmpty(id, "WorkflowID");
WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() {
public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException {
WorkflowJobBean wfBean = null;
wfBean = getWorkflowOnly(id, locking);
if (wfBean == null) {
throw new StoreException(ErrorCode.E0604, id);
}
/*
* WorkflowInstance wfInstance; //krishna and next line
* wfInstance = workflowLib.get(id); wfInstance =
* wfBean.get(wfBean.getWfInstance());
* wfBean.setWorkflowInstance(wfInstance);
* wfBean.setWfInstance(wfInstance);
*/
return wfBean;
}
});
return wfBean;
}
/**
* Get the number of Workflows with the given status.
*
* @param status Workflow Status.
* @return number of Workflows with given status.
* @throws StoreException
*/
public int getWorkflowCountWithStatus(final String status) throws StoreException {
ParamChecker.notEmpty(status, "status");
Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() {
public Integer call() throws SQLException {
Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS");
q.setParameter("status", status);
Long count = (Long) q.getSingleResult();
return Integer.valueOf(count.intValue());
}
});
return cnt.intValue();
}
/**
* Get the number of Workflows with the given status which was modified in given time limit.
*
* @param status Workflow Status.
* @param secs No. of seconds within which the workflow got modified.
* @return number of Workflows modified within given time with given status.
* @throws StoreException
*/
public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException {
ParamChecker.notEmpty(status, "status");
ParamChecker.notEmpty(status, "secs");
Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() {
public Integer call() throws SQLException {
Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS");
Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000));
q.setParameter("status", status);
q.setParameter("lastModTime", ts);
Long count = (Long) q.getSingleResult();
return Integer.valueOf(count.intValue());
}
});
return cnt.intValue();
}
/**
* Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated
*
* @param wfBean Workflow Bean
* @throws StoreException If Workflow doesn't exist
*/
public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
ParamChecker.notNull(wfBean, "WorkflowJobBean");
doOperation("updateWorkflow", new Callable<Void>() {
public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
WorkflowJobQueryExecutor.getInstance().executeUpdate(
WorkflowJobQueryExecutor.WorkflowJobQuery.UPDATE_WORKFLOW, wfBean);
return null;
}
});
}
/**
* Create a new Action record in the ACTIONS table with the given Bean.
*
* @param action WorkflowActionBean
* @throws StoreException If the action is already present
*/
public void insertAction(final WorkflowActionBean action) throws StoreException {
ParamChecker.notNull(action, "WorkflowActionBean");
doOperation("insertAction", new Callable<Void>() {
public Void call() throws SQLException, StoreException, WorkflowException {
entityManager.persist(action);
return null;
}
});
}
/**
* Load the action data and returns a bean.
*
* @param id Action Id
* @param locking true if the action is to be locked
* @return Action Bean
* @throws StoreException If action doesn't exist
*/
public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException {
ParamChecker.notEmpty(id, "ActionID");
WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() {
public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException,
InterruptedException {
Query q = entityManager.createNamedQuery("GET_ACTION");
/*
* if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
* FetchPlan fetch = oq.getFetchPlan();
* fetch.setReadLockMode(LockModeType.WRITE);
* fetch.setLockTimeout(1000); // 1 seconds }
*/
WorkflowActionBean action = null;
q.setParameter("id", id);
List<WorkflowActionBean> actions = q.getResultList();
// action = (WorkflowActionBean) q.getSingleResult();
if (actions.size() > 0) {
action = actions.get(0);
}
else {
throw new StoreException(ErrorCode.E0605, id);
}
/*
* if (locking) return action; else
*/
// return action;
return getBeanForRunningAction(action);
}
});
return action;
}
/**
* Update the given action bean to DB.
*
* @param action Action Bean
* @throws StoreException if action doesn't exist
*/
public void updateAction(final WorkflowActionBean action) throws StoreException {
ParamChecker.notNull(action, "WorkflowActionBean");
doOperation("updateAction", new Callable<Void>() {
public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
WorkflowActionQueryExecutor.getInstance().executeUpdate(
WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action);
return null;
}
});
}
/**
* Delete the Action with given id.
*
* @param id Action ID
* @throws StoreException if Action doesn't exist
*/
public void deleteAction(final String id) throws StoreException {
ParamChecker.notEmpty(id, "ActionID");
doOperation("deleteAction", new Callable<Void>() {
public Void call() throws SQLException, StoreException, WorkflowException {
/*
* Query q = entityManager.createNamedQuery("DELETE_ACTION");
* q.setParameter("id", id); q.executeUpdate();
*/
WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id);
if (action != null) {
entityManager.remove(action);
}
return null;
}
});
}
/**
* Loads all the actions for the given Workflow. Also locks all the actions if locking is true.
*
* @param wfId Workflow ID
* @param locking true if Actions are to be locked
* @return A List of WorkflowActionBean
* @throws StoreException
*/
public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking)
throws StoreException {
ParamChecker.notEmpty(wfId, "WorkflowID");
List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
new Callable<List<WorkflowActionBean>>() {
public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
InterruptedException {
List<WorkflowActionBean> actions;
List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
try {
Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
/*
* OpenJPAQuery oq = OpenJPAPersistence.cast(q);
* if (locking) { //
* q.setHint("openjpa.FetchPlan.ReadLockMode"
* ,"WRITE"); FetchPlan fetch = oq.getFetchPlan();
* fetch.setReadLockMode(LockModeType.WRITE);
* fetch.setLockTimeout(1000); // 1 seconds }
*/
q.setParameter("wfId", wfId);
actions = q.getResultList();
for (WorkflowActionBean a : actions) {
WorkflowActionBean aa = getBeanForRunningAction(a);
actionList.add(aa);
}
}
catch (IllegalStateException e) {
throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
}
/*
* if (locking) { return actions; } else {
*/
return actionList;
// }
}
});
return actions;
}
/**
* Loads given number of actions for the given Workflow. Also locks all the actions if locking is true.
*
* @param wfId Workflow ID
* @param start offset for select statement
* @param len number of Workflow Actions to be returned
* @param locking true if Actions are to be locked
* @return A List of WorkflowActionBean
* @throws StoreException
*/
public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len)
throws StoreException {
ParamChecker.notEmpty(wfId, "WorkflowID");
List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
new Callable<List<WorkflowActionBean>>() {
public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
InterruptedException {
List<WorkflowActionBean> actions;
List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
try {
Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
OpenJPAQuery oq = OpenJPAPersistence.cast(q);
q.setParameter("wfId", wfId);
q.setFirstResult(start - 1);
q.setMaxResults(len);
actions = q.getResultList();
for (WorkflowActionBean a : actions) {
WorkflowActionBean aa = getBeanForRunningAction(a);
actionList.add(aa);
}
}
catch (IllegalStateException e) {
throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
}
return actionList;
}
});
return actions;
}
/**
* Load All the actions that are pending for more than given time.
*
* @param minimumPendingAgeSecs Minimum Pending age in seconds
* @return List of action beans
* @throws StoreException
*/
public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException {
List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() {
public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
List<WorkflowActionBean> actionList = null;
try {
Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS");
q.setParameter("pendingAge", ts);
actionList = q.getResultList();
}
catch (IllegalStateException e) {
throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
}
return actionList;
}
});
return actions;
}
/**
* Load All the actions that are running and were last checked after now - miminumCheckAgeSecs
*
* @param checkAgeSecs check age in seconds.
* @return List of action beans.
* @throws StoreException
*/
public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException {
List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() {
public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>();
Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
try {
Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS");
q.setParameter("lastCheckTime", ts);
actions = q.getResultList();
}
catch (IllegalStateException e) {
throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
}
return actions;
}
});
return actions;
}
/**
* Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL.
*
* @param wfId String
* @return List of action beans
* @throws StoreException
*/
public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException {
List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS",
new Callable<List<WorkflowActionBean>>() {
public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
List<WorkflowActionBean> actionList = null;
try {
Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS");
q.setParameter("wfId", wfId);
actionList = q.getResultList();
}
catch (IllegalStateException e) {
throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
}
return actionList;
}
});
return actions;
}
/**
* Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group,
* appName, status.
*
* @param filter Filter condition
* @param start offset for select statement
* @param len number of Workflows to be returned
* @return A list of workflows
* @throws StoreException
*/
public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len)
throws StoreException {
WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() {
@SuppressWarnings("unchecked")
public WorkflowsInfo call() throws SQLException, StoreException {
List<String> orArray = new ArrayList<String>();
List<String> colArray = new ArrayList<String>();
List<String> valArray = new ArrayList<String>();
StringBuilder sb = new StringBuilder("");
boolean isStatus = false;
boolean isGroup = false;
boolean isAppName = false;
boolean isUser = false;
boolean isEnabled = false;
int index = 0;
for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
String colName = null;
String colVar = null;
if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
List<String> values = filter.get(OozieClient.FILTER_GROUP);
colName = "group";
for (int i = 0; i < values.size(); i++) {
colVar = "group";
colVar = colVar + index;
if (!isEnabled && !isGroup) {
sb.append(seletStr).append(" where w.group IN (:group" + index);
isGroup = true;
isEnabled = true;
}
else {
if (isEnabled && !isGroup) {
sb.append(" and w.group IN (:group" + index);
isGroup = true;
}
else {
if (isGroup) {
sb.append(", :group" + index);
}
}
}
if (i == values.size() - 1) {
sb.append(")");
}
index++;
valArray.add(values.get(i));
orArray.add(colName);
colArray.add(colVar);
}
}
else {
if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
List<String> values = filter.get(OozieClient.FILTER_STATUS);
colName = "status";
for (int i = 0; i < values.size(); i++) {
colVar = "status";
colVar = colVar + index;
if (!isEnabled && !isStatus) {
sb.append(seletStr).append(" where w.statusStr IN (:status" + index);
isStatus = true;
isEnabled = true;
}
else {
if (isEnabled && !isStatus) {
sb.append(" and w.statusStr IN (:status" + index);
isStatus = true;
}
else {
if (isStatus) {
sb.append(", :status" + index);
}
}
}
if (i == values.size() - 1) {
sb.append(")");
}
index++;
valArray.add(values.get(i));
orArray.add(colName);
colArray.add(colVar);
}
}
else {
if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
List<String> values = filter.get(OozieClient.FILTER_NAME);
colName = "appName";
for (int i = 0; i < values.size(); i++) {
colVar = "appName";
colVar = colVar + index;
if (!isEnabled && !isAppName) {
sb.append(seletStr).append(" where w.appName IN (:appName" + index);
isAppName = true;
isEnabled = true;
}
else {
if (isEnabled && !isAppName) {
sb.append(" and w.appName IN (:appName" + index);
isAppName = true;
}
else {
if (isAppName) {
sb.append(", :appName" + index);
}
}
}
if (i == values.size() - 1) {
sb.append(")");
}
index++;
valArray.add(values.get(i));
orArray.add(colName);
colArray.add(colVar);
}
}
else {
if (entry.getKey().equals(OozieClient.FILTER_USER)) {
List<String> values = filter.get(OozieClient.FILTER_USER);
colName = "user";
for (int i = 0; i < values.size(); i++) {
colVar = "user";
colVar = colVar + index;
if (!isEnabled && !isUser) {
sb.append(seletStr).append(" where w.user IN (:user" + index);
isUser = true;
isEnabled = true;
}
else {
if (isEnabled && !isUser) {
sb.append(" and w.user IN (:user" + index);
isUser = true;
}
else {
if (isUser) {
sb.append(", :user" + index);
}
}
}
if (i == values.size() - 1) {
sb.append(")");
}
index++;
valArray.add(values.get(i));
orArray.add(colName);
colArray.add(colVar);
}
}
}
}
}
}
int realLen = 0;
Query q = null;
Query qTotal = null;
if (orArray.size() == 0) {
q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS");
q.setFirstResult(start - 1);
q.setMaxResults(len);
qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT");
}
else {
if (orArray.size() > 0) {
StringBuilder sbTotal = new StringBuilder(sb);
sb.append(" order by w.startTimestamp desc ");
XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
q = entityManager.createQuery(sb.toString());
q.setFirstResult(start - 1);
q.setMaxResults(len);
qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr));
for (int i = 0; i < orArray.size(); i++) {
q.setParameter(colArray.get(i), valArray.get(i));
qTotal.setParameter(colArray.get(i), valArray.get(i));
}
}
}
OpenJPAQuery kq = OpenJPAPersistence.cast(q);
JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
fetch.setFetchBatchSize(20);
fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
fetch.setFetchDirection(FetchDirection.FORWARD);
fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
List<?> resultList = q.getResultList();
List<Object[]> objectArrList = (List<Object[]>) resultList;
List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
for (Object[] arr : objectArrList) {
WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
wfBeansList.add(ww);
}
realLen = ((Long) qTotal.getSingleResult()).intValue();
return new WorkflowsInfo(wfBeansList, start, len, realLen);
}
});
return workFlowsInfo;
}
/**
* Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded
*
* @param id Workflow Id
* @return Workflow Bean
* @throws StoreException If Workflow doesn't exist
*/
public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException {
ParamChecker.notEmpty(id, "WorkflowID");
WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
WorkflowJobBean wfBean = null;
wfBean = getWorkflowforInfo(id, false);
if (wfBean == null) {
throw new StoreException(ErrorCode.E0604, id);
}
else {
wfBean.setActions(getActionsForWorkflow(id, false));
}
return wfBean;
}
});
return wfBean;
}
/**
* Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded
*
* @param id Workflow Id
* @param start offset for select statement for actions
* @param len number of Workflow Actions to be returned
* @return Workflow Bean
* @throws StoreException If Workflow doesn't exist
*/
public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException {
ParamChecker.notEmpty(id, "WorkflowID");
WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
WorkflowJobBean wfBean = null;
wfBean = getWorkflowforInfo(id, false);
if (wfBean == null) {
throw new StoreException(ErrorCode.E0604, id);
}
else {
wfBean.setActions(getActionsSubsetForWorkflow(id, start, len));
}
return wfBean;
}
});
return wfBean;
}
/**
* Get the Workflow ID with given external ID which will be assigned for the subworkflows.
*
* @param externalId external ID
* @return Workflow ID
* @throws StoreException if there is no job with external ID
*/
public String getWorkflowIdForExternalId(final String externalId) throws StoreException {
ParamChecker.notEmpty(externalId, "externalId");
String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() {
public String call() throws SQLException, StoreException {
String id = "";
Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID");
q.setParameter("externalId", externalId);
List<String> w = q.getResultList();
if (w.size() == 0) {
id = "";
}
else {
int index = w.size() - 1;
id = w.get(index);
}
return id;
}
});
return wfId;
}
private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
/**
* Purge the Workflows Completed older than given days.
*
* @param olderThanDays number of days for which to preserve the workflows
* @throws StoreException
*/
public void purge(final long olderThanDays, final int limit) throws StoreException {
doOperation("purge", new Callable<Void>() {
public Void call() throws SQLException, StoreException, WorkflowException {
Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN");
q.setParameter("endTime", maxEndTime);
q.setMaxResults(limit);
List<WorkflowJobBean> workflows = q.getResultList();
int actionDeleted = 0;
if (workflows.size() != 0) {
for (WorkflowJobBean w : workflows) {
String wfId = w.getId();
entityManager.remove(w);
Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
g.setParameter("wfId", wfId);
actionDeleted += g.executeUpdate();
}
}
XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted);
return null;
}
});
}
private <V> V doOperation(String name, Callable<V> command) throws StoreException {
try {
Instrumentation.Cron cron = new Instrumentation.Cron();
cron.start();
V retVal;
try {
retVal = command.call();
}
finally {
cron.stop();
}
Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
return retVal;
}
catch (StoreException ex) {
throw ex;
}
catch (SQLException ex) {
throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
}
catch (Exception e) {
throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
}
}
private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException,
InterruptedException, StoreException {
WorkflowJobBean wfBean = null;
Query q = entityManager.createNamedQuery("GET_WORKFLOW");
/*
* if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ");
* OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch =
* oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE);
* fetch.setLockTimeout(-1); // unlimited }
*/
q.setParameter("id", id);
List<WorkflowJobBean> w = q.getResultList();
if (w.size() > 0) {
wfBean = w.get(0);
}
return wfBean;
// return getBeanForRunningWorkflow(wfBean);
}
private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException,
InterruptedException, StoreException {
WorkflowJobBean wfBean = null;
Query q = entityManager.createNamedQuery("GET_WORKFLOW");
q.setParameter("id", id);
List<WorkflowJobBean> w = q.getResultList();
if (w.size() > 0) {
wfBean = w.get(0);
return getBeanForRunningWorkflow(wfBean);
}
return null;
}
private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException {
WorkflowJobBean wfBean = new WorkflowJobBean();
wfBean.setId(w.getId());
wfBean.setAppName(w.getAppName());
wfBean.setAppPath(w.getAppPath());
wfBean.setConfBlob(w.getConfBlob());
wfBean.setGroup(w.getGroup());
wfBean.setRun(w.getRun());
wfBean.setUser(w.getUser());
wfBean.setCreatedTime(w.getCreatedTime());
wfBean.setEndTime(w.getEndTime());
wfBean.setExternalId(w.getExternalId());
wfBean.setLastModifiedTime(w.getLastModifiedTime());
wfBean.setLogToken(w.getLogToken());
wfBean.setProtoActionConfBlob(w.getProtoActionConfBlob());
wfBean.setSlaXmlBlob(w.getSlaXmlBlob());
wfBean.setStartTime(w.getStartTime());
wfBean.setStatus(w.getStatus());
wfBean.setWfInstanceBlob(w.getWfInstanceBlob());
return wfBean;
}
private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
WorkflowJobBean wfBean = new WorkflowJobBean();
wfBean.setId((String) arr[0]);
if (arr[1] != null) {
wfBean.setAppName((String) arr[1]);
}
if (arr[2] != null) {
wfBean.setStatus(Status.valueOf((String) arr[2]));
}
if (arr[3] != null) {
wfBean.setRun((Integer) arr[3]);
}
if (arr[4] != null) {
wfBean.setUser((String) arr[4]);
}
if (arr[5] != null) {
wfBean.setGroup((String) arr[5]);
}
if (arr[6] != null) {
wfBean.setCreatedTime((Timestamp) arr[6]);
}
if (arr[7] != null) {
wfBean.setStartTime((Timestamp) arr[7]);
}
if (arr[8] != null) {
wfBean.setLastModifiedTime((Timestamp) arr[8]);
}
if (arr[9] != null) {
wfBean.setEndTime((Timestamp) arr[9]);
}
return wfBean;
}
private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException {
if (a != null) {
WorkflowActionBean action = new WorkflowActionBean();
action.setId(a.getId());
action.setConfBlob(a.getConfBlob());
action.setConsoleUrl(a.getConsoleUrl());
action.setDataBlob(a.getDataBlob());
action.setStatsBlob(a.getStatsBlob());
action.setExternalChildIDsBlob(a.getExternalChildIDsBlob());
action.setErrorInfo(a.getErrorCode(), a.getErrorMessage());
action.setExternalId(a.getExternalId());
action.setExternalStatus(a.getExternalStatus());
action.setName(a.getName());
action.setCred(a.getCred());
action.setRetries(a.getRetries());
action.setTrackerUri(a.getTrackerUri());
action.setTransition(a.getTransition());
action.setType(a.getType());
action.setEndTime(a.getEndTime());
action.setExecutionPath(a.getExecutionPath());
action.setLastCheckTime(a.getLastCheckTime());
action.setLogToken(a.getLogToken());
if (a.isPending() == true) {
action.setPending();
}
action.setPendingAge(a.getPendingAge());
action.setSignalValue(a.getSignalValue());
action.setSlaXmlBlob(a.getSlaXmlBlob());
action.setStartTime(a.getStartTime());
action.setStatus(a.getStatus());
action.setJobId(a.getWfId());
action.setUserRetryCount(a.getUserRetryCount());
action.setUserRetryInterval(a.getUserRetryInterval());
action.setUserRetryMax(a.getUserRetryMax());
return action;
}
return null;
}
}
In my next blog post, I will talk more about the MapperLauncher, DAGEngine, and etc. Stay tuned.