YARN NodesListManager

In a YARN cluster, you might encounter some strangler or problematic nodes occasionally that you would like to decommission from being used in your MR jobs.

You can add those nodes in the exclude file you defined in the yarn-site.xml. The property used to specify the exclude file is called yarn.resourcemanager.nodes.exclude-path

Then, you can execute the command to dynamically remove the nodes from the cluster

yarn rmadmin -refreshNodes

Behind the scene, if you look at the ResourceManager class, it has an instance of NodesListManager. This service class is responsible of parsing the exclude file and use the nodes list to check against any node manager request for registration.

public class ResourceManager extends CompositeService implements Recoverable {

  /**
   * Priority of the ResourceManager shutdown hook.
   */
  public static final int SHUTDOWN_HOOK_PRIORITY = 30;

  private static final Log LOG = LogFactory.getLog(ResourceManager.class);
  private static long clusterTimeStamp = System.currentTimeMillis();

  /**
   * "Always On" services. Services that need to run always irrespective of
   * the HA state of the RM.
   */
  @VisibleForTesting
  protected RMContextImpl rmContext;
  private Dispatcher rmDispatcher;
  @VisibleForTesting
  protected AdminService adminService;

  /**
   * "Active" services. Services that need to run only on the Active RM.
   * These services are managed (initialized, started, stopped) by the
   * {@link CompositeService} RMActiveServices.
   *
   * RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is
   * in Active state.
   */
  protected RMActiveServices activeServices;
  protected RMSecretManagerService rmSecretManagerService;

  protected ResourceScheduler scheduler;
  protected ReservationSystem reservationSystem;
  private ClientRMService clientRM;
  protected ApplicationMasterService masterService;
  protected NMLivelinessMonitor nmLivelinessMonitor;
  protected NodesListManager nodesListManager;
 

If you check the NodesListManager, there is a refreshNodes method which reads the configuration file and then read the exclude nodes in the specified exclude file.

Stay tuned for more infos.


 public void refreshNodes(Configuration yarnConf) throws IOException,
      YarnException {
    refreshHostsReader(yarnConf);

    for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
      if (!isValidNode(nodeId.getHost())) {
        this.rmContext.getDispatcher().getEventHandler().handle(
            new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
      }
    }
  }

private void refreshHostsReader(Configuration yarnConf) throws IOException,
      YarnException {
    synchronized (hostsReader) {
      if (null == yarnConf) {
        yarnConf = new YarnConfiguration();
      }
      includesFile =
          yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
              YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH);
      excludesFile =
          yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
              YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
      hostsReader.updateFileNames(includesFile, excludesFile);
      hostsReader.refresh(
          includesFile.isEmpty() ? null : this.rmContext
              .getConfigurationProvider().getConfigurationInputStream(
                  this.conf, includesFile), excludesFile.isEmpty() ? null
              : this.rmContext.getConfigurationProvider()
                  .getConfigurationInputStream(this.conf, excludesFile));
      printConfiguredHosts();
    }
  }

Cache Management in HDFS

The popularity of Spark has spurred  great interest in providing memory cache capability in HDFS. With the new release of Hadoop 2.3.0, centralized cache management has been introduced.

See the following JIRA.

https://issues.apache.org/jira/browse/HDFS-4949

You can view the design doc here. It is an excellent read. Kudos to both Andrew Wang and Colin Patrick McCabe. They are the instrumental and key contributors of this feature.

This cache memory feature allows memory local HDFS clients to use zero copy read (mlock native calls) to access these memory cached data to further speed up read operation. The mlock and mlockall native calls tell the system to lock to a specified memory range. This prevents the allocated memory from page fault.

You can learn about mlock from the following page.

https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_MRG/1.3/html/Realtime_Reference_Guide/sect-Realtime_Reference_Guide-Memory_allocation-Using_mlock_to_avoid_memory_faults.html

You can also find the Java native IO mlock and munlock call to memory mapped data in org.apache.hadoop.io.nativeio.NativeIO.java

https://github.com/apache/hadoop-common/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java

and the corresponding C native code
https://github.com/apache/hadoop-common/blob/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c

/**
* Used to manipulate the operating system cache.
*/
@VisibleForTesting
public static class CacheManipulator {
public void mlock(String identifier, ByteBuffer buffer,
long len) throws IOException {
POSIX.mlock(buffer, len);
}

public long getMemlockLimit() {
return NativeIO.getMemlockLimit();
}

public long getOperatingSystemPageSize() {
return NativeIO.getOperatingSystemPageSize();
}

public void posixFadviseIfPossible(String identifier,
FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException {
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
len, flags);
}

public boolean verifyCanMlock() {
return NativeIO.isAvailable();
}
}

NameNode has to keep track of the memory cache replicas at different DataNodes with this feature. The cache blocks infos are exchanged between them via heartbeat protocol. As a result, additional metadata e.g.. cachedHosts array is introduced in BlockLocation as seen below.


public class BlockLocation {
  private String[] hosts; // Datanode hostnames
  <strong>private String[] cachedHosts; // Datanode hostnames with a cached replica</strong>
  private String[] names; // Datanode IP:xferPort for accessing the block
  private String[] topologyPaths; // Full path name in network topology
  private long offset;  // Offset of the block in the file
  private long length;
  private boolean corrupt;

Stay tuned for more infos…

Zero copy technique in Hadoop and Spark

What exactly are zero copy techniques and how these techniques can be employed to achieve better performance in distributed system ?

If you browse the Hadoop MapReduce and Spark JIRA tickets, you will find a number of tickets related to the use of zero copy techniques such as MMap memory mapped files and sendFile() to improve the system.

Zero copy techniques are these techniques used to eliminate unnecessary data copy and context switches across application and kernel space. Please refer to the following excellent post for an in depth explanation of these techniques.

http://www.ibm.com/developerworks/library/j-zerocopy/

Traditionally, if a server wants to send data over the network to a client, it needs to read the data from the disk into kernel memory before storing it in the user memory. Then it transfers the data again from the user memory space to kernel buffer associated with the network stack before sending to the network interface card. See Figure 1 (taken from the above paper)

 Screen shot 2014-08-17 at 4.16.04 PM

A popular zero copy technique is called sendFile() or transferTo. Please see the following figure. (taken from the same paper).

Screen shot 2014-08-17 at 4.19.25 PM

As you see in Hadoop, it has already reverted to use zero copy transferTo way back in version 0.18.

https://issues.apache.org/jira/browse/HADOOP-3164

Also, it uses the same technique in sending shuffle files. Please see

https://github.com/apache/hadoop-common/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

In Spark, there is also plan to use the same technique in sending shuffle files, targeted for upcoming version 1.2.0.

Spark-2468 : introduce the same transferTo technique in sending shuffle files.

https://issues.apache.org/jira/browse/SPARK-2468

Hadoop MapTask Spill Mechanism Part 2

In the last post, we looked at MapOutputBuffer class. This is the class that maintains the in memory buffer byte array, byte[] kvbuffer during the map task intermediate output writing phase. As the buffer exceeds the threshold, it starts spilling data to the disk.

Inside MapOutputBuffer, there is a thread called SpillThread.

    final ReentrantLock spillLock = new ReentrantLock();
    final Condition spillDone = spillLock.newCondition();
    final Condition spillReady = spillLock.newCondition();
    final BlockingBuffer bb = new BlockingBuffer();
    volatile boolean spillThreadRunning = false;
    final SpillThread spillThread = new SpillThread();

It is a daemon thread and is initiated in the init method of MapOutputBuffer.


public void init(MapOutputCollector.Context context
                    ) throws IOException, ClassNotFoundException {

      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }

}

SpillThread is signalled to write out the data to disk (startSpill()) whenever the memory buffer soft limit is exceeded (if bufferRemaining <= 0 ) as the MapTask is writing the intermediate output in the buffer memory.


 public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {


if (bufferRemaining <= 0) {
        // start spill if the thread is not running and the soft limit has been
        // reached
        spillLock.lock();
        try {
          do {
            if (!spillInProgress) {
              final int kvbidx = 4 * kvindex;
              final int kvbend = 4 * kvend;
              // serialized, unspilled bytes always lie between kvindex and
              // bufindex, crossing the equator. Note that any void space
              // created by a reset must be included in "used" bytes
              final int bUsed = distanceTo(kvbidx, bufindex);
              final boolean bufsoftlimit = bUsed >= softLimit;
              if ((kvbend + METASIZE) % kvbuffer.length !=
                  equator - (equator % METASIZE)) {
                // spill finished, reclaim space
                resetSpill();
                bufferRemaining = Math.min(
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                    softLimit - bUsed) - METASIZE;
                continue;
              } else if (bufsoftlimit && kvindex != kvend) {
                // spill records, if any collected; check latter, as it may
                // be possible for metadata alignment to hit spill pcnt
                startSpill();
                final int avgRec = (int)
                  (mapOutputByteCounter.getCounter() /
                  mapOutputRecordCounter.getCounter());
                // leave at least half the split buffer for serialization data
                // ensure that kvindex >= bufindex
                final int distkvi = distanceTo(bufindex, kvbidx);
                final int newPos = (bufindex +
                  Math.max(2 * METASIZE - 1,
                          Math.min(distkvi / 2,
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
                  % kvbuffer.length;
                setEquator(newPos);
                bufmark = bufindex = newPos;
                final int serBound = 4 * kvend;
                // bytes remaining before the lock must be held and limits
                // checked is the minimum of three arcs: the metadata space, the
                // serialization space, and the soft limit
                bufferRemaining = Math.min(
                    // metadata max
                    distanceTo(bufend, newPos),
                    Math.min(
                      // serialization max
                      distanceTo(newPos, serBound),
                      // soft limit
                      softLimit)) - 2 * METASIZE;
              }
            }
          } while (false);
        } finally {
          spillLock.unlock();
        }



}

Hope you all enjoy this post !

Hadoop MapTask Spill Mechanism

Have you ever wondered how the Hadoop map task’s sort and spill mechanism code looks like ?

Here you goes. You can browse the following class.
MapTask.java (from hadoop mapreduce project on github)

In the map task, there is a buffer in memory to store the output of the map task. When the buffer exceeds the threshold, it spills the data to disk. Two parameters are specified to control this behavior. io.sort.mb (defaults to 100MB) is the size of the buffer and io.sort.spill.percent (defaults to 80%) is the threshold before spilling to the disk.

In fact, the MapOutputBuffer is pluggable now (MAPREDUCE-4807) New interface of MapOutputCollector.

public interface MapOutputCollector<K, V> {
  public void init(Context context
                  ) throws IOException, ClassNotFoundException;
  public void collect(K key, V value, int partition
                     ) throws IOException, InterruptedException;
  public void close() throws IOException, InterruptedException;

  public void flush() throws IOException, InterruptedException,
                             ClassNotFoundException;

  @InterfaceAudience.LimitedPrivate({"MapReduce"})
  @InterfaceStability.Unstable
  public static class Context {
    private final MapTask mapTask;
    private final JobConf jobConf;
    private final TaskReporter reporter;

    public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) {
      this.mapTask = mapTask;
      this.jobConf = jobConf;
      this.reporter = reporter;
    }

    public MapTask getMapTask() {
      return mapTask;
    }

    public JobConf getJobConf() {
      return jobConf;
    }

    public TaskReporter getReporter() {
      return reporter;
    }
  }
}

In the MapTask.java, creatingSortingCollector instantiates MapOutputBuffer, the implementation of MapOutputCollector via reflection.

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector<KEY, VALUE> collector
      = (MapOutputCollector<KEY, VALUE>)
       ReflectionUtils.newInstance(
                        job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
                        MapOutputBuffer.class, MapOutputCollector.class), job);
    LOG.info("Map output collector class = " + collector.getClass().getName());
    MapOutputCollector.Context context =
                           new MapOutputCollector.Context(this, job, reporter);
    collector.init(context);
    return collector;
  }

In the MapOutputBuffer, you can see both the io.sort.mb and io.sort.spill.percent are used to determine the size of the in memory buffer (maxMemoryUsage).

//sanity checks
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);
      // buffers and accounting
      int maxMemUsage = sortmb << 20;
      maxMemUsage -= maxMemUsage % METASIZE;
      kvbuffer = new byte[maxMemUsage];

To be continued…..

Different Maven Profiles for Hadoop 1 and Hadoop 2

Since we have different versions of Hadoop, sometimes we have to compile/deploy our codes against one but not the other. The ideal way is to setup profiles in maven to compile and create the artifacts accordingly.

Here is the profiles section defined in HBase project. You can create the same profiles in your pom.

To compile against one of these profiles,
mvn -Dhadoop.profile=1.1 clean compile

<profiles>
		<profile>
			<id>hadoop-1.1</id>
			<activation>
				<property>
					<name>hadoop.profile</name>
					<value>1.1</value>
				</property>
			</activation>
			<dependencies>
				<dependency>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-core</artifactId>
					<version>${hadoop-1.version}</version>
				</dependency>
				<dependency>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-test</artifactId>
					<version>${hadoop-1.version}</version>
				</dependency>
			</dependencies>
		</profile>
		<profile>
			<id>hadoop-1.0</id>
			<activation>
				<property>
					<name>hadoop.profile</name>
					<value>1.0</value>
				</property>
			</activation>
			<dependencies>
				<dependency>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-core</artifactId>
					<version>${hadoop-1.version}</version>
				</dependency>
				<dependency>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-test</artifactId>
					<version>${hadoop-1.version}</version>
				</dependency>
			</dependencies>
		</profile>
		<profile>
			<id>hadoop-2.0</id>
			<activation>
				<property>
					<name>!hadoop.profile</name>
				</property>
			</activation>
			<dependencyManagement>
				<dependencies>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-mapreduce-client-core</artifactId>
						<version>${hadoop-2.version}</version>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
						<version>${hadoop-2.version}</version>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
						<version>${hadoop-2.version}</version>
						<type>test-jar</type>
						<scope>test</scope>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-hdfs</artifactId>
						<exclusions>
							<exclusion>
								<groupId>javax.servlet.jsp</groupId>
								<artifactId>jsp-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>javax.servlet</groupId>
								<artifactId>servlet-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>stax</groupId>
								<artifactId>stax-api</artifactId>
							</exclusion>
						</exclusions>
						<version>${hadoop-2.version}</version>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-hdfs</artifactId>
						<version>${hadoop-2.version}</version>
						<type>test-jar</type>
						<scope>test</scope>
						<exclusions>
							<exclusion>
								<groupId>javax.servlet.jsp</groupId>
								<artifactId>jsp-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>javax.servlet</groupId>
								<artifactId>servlet-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>stax</groupId>
								<artifactId>stax-api</artifactId>
							</exclusion>
						</exclusions>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-auth</artifactId>
						<version>${hadoop-2.version}</version>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-common</artifactId>
						<version>${hadoop-2.version}</version>
						<exclusions>
							<exclusion>
								<groupId>javax.servlet.jsp</groupId>
								<artifactId>jsp-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>javax.servlet</groupId>
								<artifactId>servlet-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>stax</groupId>
								<artifactId>stax-api</artifactId>
							</exclusion>
						</exclusions>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-client</artifactId>
						<version>${hadoop-2.version}</version>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-annotations</artifactId>
						<version>${hadoop-2.version}</version>
					</dependency>
					<!-- This was marked as test dep in earlier pom, but was scoped compile. 
						Where do we actually need it? -->
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-minicluster</artifactId>
						<version>${hadoop-2.version}</version>
						<exclusions>
							<exclusion>
								<groupId>javax.servlet.jsp</groupId>
								<artifactId>jsp-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>javax.servlet</groupId>
								<artifactId>servlet-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>stax</groupId>
								<artifactId>stax-api</artifactId>
							</exclusion>
						</exclusions>
					</dependency>
				</dependencies>
			</dependencyManagement>
		</profile>
	</profiles>

A look into Oozie internals

I have been looking into the Oozie code base trying to understand its underlying architecture.
Here are some of my findings:

There are different Services in the Oozie web app. Among them are ActionService, AuthorizationService, CoordinatorStoreService, DagEngineService, SchemaService, UserGroupInformationService, etc. They are in the org.apache.oozie.service package. These services implemented the Service interface. See the Service interface below.

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.oozie.service;

/**
 * A service is component managed by the {@link Services} singleton.
 */
public interface Service {
    public static final String DEFAULT_LOCK_TIMEOUT = "oozie.service.default.lock.timeout";

    /**
     * Prefix for all services configuration properties.
     */
    public static final String CONF_PREFIX = "oozie.service.";

    /**
     * Constant for XCommand
     */
    public static final String USE_XCOMMAND = "oozie.useXCommand";

    /**
     * Initialize the service. <p/> Invoked by the {@link Service} singleton at start up time.
     *
     * @param services services singleton initializing the service.
     * @throws ServiceException thrown if the service could not initialize.
     */
    public void init(Services services) throws ServiceException;

    /**
     * Destroy the service. <p/> Invoked by the {@link Service} singleton at shutdown time.
     */
    public void destroy();

    /**
     * Return the public interface of the service. <p/> Services are retrieved by its public interface. Specializations
     * of services must return the public interface.
     *
     * @return the interface of the service.
     */
    public Class<? extends Service> getInterface();

    /**
     * Lock timeout value if service is only allowed to have one single running instance.
     */
    public static long lockTimeout = Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);

}

The way Oozie makes use of these services is mostly through the app.getService call.
Incoming requests are handled by different servlets which would invoke different services to get thing done.

Also, there are different executors. Each extends the abstract ActionExecutor class. Among them are EmailActionExecutor, HiveActionExecutor.

There is another kind of Executor, JPAExecutor which implements a different interface.
They are used in JPAService. JPAService is the basic JPA service which provides database execution service, eg. execute the select query, update query, or JPAExecutor operation. OpenJPA is used in the ORM layer.

The standard ORM entity beans are used to represent DB entity object with named queries.
For example, WorkflowJobBean as shown below.


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.oozie;

import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.JsonUtils;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.IOException;
import java.io.DataOutput;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import javax.persistence.Entity;
import javax.persistence.Column;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Basic;
import javax.persistence.Lob;
import javax.persistence.Table;
import javax.persistence.Transient;

import java.sql.Timestamp;

import org.apache.openjpa.persistence.jdbc.Index;
import org.apache.openjpa.persistence.jdbc.Strategy;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

@Entity

@NamedQueries({

    @NamedQuery(name = "UPDATE_WORKFLOW", query = "update WorkflowJobBean w set w.appName = :appName, w.appPath = :appPath, w.conf = :conf, w.group = :groupName, w.run = :run, w.user = :user, w.createdTimestamp = :createdTime, w.endTimestamp = :endTime, w.externalId = :externalId, w.lastModifiedTimestamp = :lastModTime,w.logToken = :logToken, w.protoActionConf = :protoActionConf, w.slaXml =:slaXml, w.startTimestamp = :startTime, w.statusStr = :status, w.wfInstance = :wfInstance where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_MODTIME", query = "update WorkflowJobBean w set w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_MODTIME", query = "update WorkflowJobBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_PARENT_MODIFIED", query = "update WorkflowJobBean w set w.parentId = :parentId, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime, w.endTimestamp = :endTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime, w.startTimestamp = :startTime, w.endTimestamp = :endTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_RERUN", query = "update WorkflowJobBean w set w.appName = :appName, w.protoActionConf = :protoActionConf, w.appPath = :appPath, w.conf = :conf, w.logToken = :logToken, w.user = :user, w.group = :group, w.externalId = :externalId, w.endTimestamp = :endTime, w.run = :run, w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "DELETE_WORKFLOW", query = "delete from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOWS", query = "select OBJECT(w) from WorkflowJobBean w order by w.startTimestamp desc"),

    @NamedQuery(name = "GET_WORKFLOWS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId from WorkflowJobBean w order by w.createdTimestamp desc"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT", query = "select count(w) from WorkflowJobBean w"),

    @NamedQuery(name = "GET_COMPLETED_WORKFLOWS_OLDER_THAN", query = "select w from WorkflowJobBean w where w.endTimestamp < :endTime"),

    @NamedQuery(name = "GET_COMPLETED_WORKFLOWS_WITH_NO_PARENT_OLDER_THAN", query = "select w.id from WorkflowJobBean w where w.endTimestamp < :endTime and w.parentId is null"),

    @NamedQuery(name = "GET_WORKFLOW", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_STARTTIME", query = "select w.id, w.startTimestamp from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_USER_GROUP", query = "select w.user, w.group from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_SUSPEND", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance  from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_RERUN", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.run, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_DEFINITION", query = "select w.id, w.user, w.group, w.appName, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_ACTION_OP", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_KILL", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.slaXml from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_RESUME", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_FOR_UPDATE", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_FOR_SLA", query = "select w.id, w.statusStr, w.startTimestamp, w.endTimestamp from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_ID_FOR_EXTERNAL_ID", query = "select  w.id from WorkflowJobBean w where w.externalId = :externalId"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS", query = "select count(w) from WorkflowJobBean w where w.statusStr = :status"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS", query = "select count(w) from WorkflowJobBean w where w.statusStr = :status and w.lastModifiedTimestamp > :lastModTime"),

    @NamedQuery(name = "GET_WORKFLOWS_WITH_WORKFLOW_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId = :parentId"),

    @NamedQuery(name = "GET_WORKFLOWS_WITH_COORD_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId like :parentId"), // when setting parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%")

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' OR w.endTimestamp >= :endTime)"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId like :parentId and (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' OR w.endTimestamp >= :endTime)"), // when setting parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%")

    @NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_STATUS", query = "select w.statusStr from WorkflowJobBean w where w.id = :id")
        })
@Table(name = "WF_JOBS")
public class WorkflowJobBean implements Writable, WorkflowJob, JsonBean {

    @Id
    private String id;

    @Basic
    @Column(name = "proto_action_conf")
    @Lob
    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
    private StringBlob protoActionConf;

    @Basic
    @Column(name = "log_token")
    private String logToken = null;

    @Basic
    @Index
    @Column(name = "external_id")
    private String externalId = null;

    @Basic
    @Index
    @Column(name = "status")
    private String statusStr = WorkflowJob.Status.PREP.toString();

    @Basic
    @Column(name = "created_time")
    private java.sql.Timestamp createdTimestamp = null;

    @Basic
    @Column(name = "start_time")
    private java.sql.Timestamp startTimestamp = null;

    @Basic
    @Index
    @Column(name = "end_time")
    private java.sql.Timestamp endTimestamp = null;

    @Basic
    @Index
    @Column(name = "last_modified_time")
    private java.sql.Timestamp lastModifiedTimestamp = null;

    @Basic
    @Column(name = "wf_instance")
    @Lob
    @Strategy("org.apache.oozie.executor.jpa.BinaryBlobValueHandler")
    private BinaryBlob wfInstance ;

    @Basic
    @Column(name = "sla_xml")
    @Lob
    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
    private StringBlob slaXml;


    @Basic
    @Column(name = "app_name")
    private String appName = null;

    @Basic
    @Column(name = "app_path")
    private String appPath = null;

    @Basic
    @Column(name = "conf")
    @Lob
    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
    private StringBlob conf;

    @Basic
    @Column(name = "user_name")
    private String user = null;

    @Basic
    @Column(name = "group_name")
    private String group;

    @Basic
    @Column(name = "run")
    private int run = 1;

    @Basic
    @Index
    @Column(name = "parent_id")
    private String parentId;

    @Transient
    private String consoleUrl;

    @Transient
    private List<WorkflowActionBean> actions;


    /**
     * Default constructor.
     */
    public WorkflowJobBean() {
        actions = new ArrayList<WorkflowActionBean>();
    }

    /**
     * Serialize the workflow bean to a data output.
     *
     * @param dataOutput data output.
     * @throws IOException thrown if the workflow bean could not be serialized.
     */
    public void write(DataOutput dataOutput) throws IOException {
        WritableUtils.writeStr(dataOutput, getAppPath());
        WritableUtils.writeStr(dataOutput, getAppName());
        WritableUtils.writeStr(dataOutput, getId());
        WritableUtils.writeStr(dataOutput, getParentId());
        WritableUtils.writeStr(dataOutput, getConf());
        WritableUtils.writeStr(dataOutput, getStatusStr());
        dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
        dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
        dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
        dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
        WritableUtils.writeStr(dataOutput, getUser());
        WritableUtils.writeStr(dataOutput, getGroup());
        dataOutput.writeInt(getRun());
        WritableUtils.writeStr(dataOutput, logToken);
        WritableUtils.writeStr(dataOutput, getProtoActionConf());
    }

    /**
     * Deserialize a workflow bean from a data input.
     *
     * @param dataInput data input.
     * @throws IOException thrown if the workflow bean could not be deserialized.
     */
    public void readFields(DataInput dataInput) throws IOException {
        setAppPath(WritableUtils.readStr(dataInput));
        setAppName(WritableUtils.readStr(dataInput));
        setId(WritableUtils.readStr(dataInput));
        setParentId(WritableUtils.readStr(dataInput));
        setConf(WritableUtils.readStr(dataInput));
        setStatus(WorkflowJob.Status.valueOf(WritableUtils.readStr(dataInput)));
        // setStatus(WritableUtils.readStr(dataInput));
        long d = dataInput.readLong();
        if (d != -1) {
            setCreatedTime(new Date(d));
        }
        d = dataInput.readLong();
        if (d != -1) {
        }
        setStartTime(new Date(d));
        d = dataInput.readLong();
        if (d != -1) {
            setLastModifiedTime(new Date(d));
        }
        d = dataInput.readLong();
        if (d != -1) {
            setEndTime(new Date(d));
        }
        setUser(WritableUtils.readStr(dataInput));
        setGroup(WritableUtils.readStr(dataInput));
        setRun(dataInput.readInt());
        logToken = WritableUtils.readStr(dataInput);
        setProtoActionConf(WritableUtils.readStr(dataInput));
        setExternalId(getExternalId());
    }

    public boolean inTerminalState() {
        boolean inTerminalState = false;
        switch (WorkflowJob.Status.valueOf(statusStr)) {
            case FAILED:
            case KILLED:
            case SUCCEEDED:
                inTerminalState = true;
                break;
            default:
                break;
        }
        return inTerminalState;
    }

    public String getLogToken() {
        return logToken;
    }

    public void setLogToken(String logToken) {
        this.logToken = logToken;
    }

    public String getSlaXml() {
        return slaXml == null ? null : slaXml.getString();
    }

    public void setSlaXml(String slaXml) {
        if (this.slaXml == null) {
            this.slaXml = new StringBlob(slaXml);
        }
        else {
            this.slaXml.setString(slaXml);
        }
    }

    public void setSlaXmlBlob(StringBlob slaXml) {
        this.slaXml = slaXml;
    }

    public StringBlob getSlaXmlBlob() {
        return this.slaXml;
    }

    public WorkflowInstance getWorkflowInstance() {
        return wfInstance == null ? null : get(wfInstance.getBytes());
    }

    public BinaryBlob getWfInstanceBlob() {
        return this.wfInstance;
    }

    public void setWorkflowInstance(WorkflowInstance workflowInstance) {
        if (this.wfInstance == null) {
            this.wfInstance = new BinaryBlob(WritableUtils.toByteArray((LiteWorkflowInstance) workflowInstance), true);
        }
        else {
            this.wfInstance.setBytes(WritableUtils.toByteArray((LiteWorkflowInstance) workflowInstance));
        }
    }

    public void setWfInstanceBlob(BinaryBlob wfInstance) {
        this.wfInstance = wfInstance;
    }

    public String getProtoActionConf() {
        return protoActionConf == null ? null : protoActionConf.getString();
    }

    public void setProtoActionConf(String protoActionConf) {
        if (this.protoActionConf == null) {
            this.protoActionConf = new StringBlob(protoActionConf);
        }
        else {
            this.protoActionConf.setString(protoActionConf);
        }
    }

    public void setProtoActionConfBlob (StringBlob protoBytes) {
        this.protoActionConf = protoBytes;
    }

    public StringBlob getProtoActionConfBlob() {
        return this.protoActionConf;
    }

    public String getlogToken() {
        return logToken;
    }

    public Timestamp getLastModifiedTimestamp() {
        return lastModifiedTimestamp;
    }

    public Timestamp getStartTimestamp() {
        return startTimestamp;
    }

    public Timestamp getCreatedTimestamp() {
        return createdTimestamp;
    }

    public Timestamp getEndTimestamp() {
        return endTimestamp;
    }

    public void setStatusStr (String statusStr) {
        this.statusStr = statusStr;
    }

    public void setStatus(Status val) {
        this.statusStr = val.toString();
    }

    @Override
    public Status getStatus() {
        return Status.valueOf(statusStr);
    }

    public String getStatusStr() {
        return statusStr;
    }

    public void setExternalId(String externalId) {
        this.externalId = externalId;
    }

    @Override
    public String getExternalId() {
        return externalId;
    }

    public void setLastModifiedTime(Date lastModifiedTime) {
        this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
    }

    public Date getLastModifiedTime() {
        return DateUtils.toDate(lastModifiedTimestamp);
    }

    public Date getCreatedTime() {
        return DateUtils.toDate(createdTimestamp);
    }

    public void setCreatedTime(Date createdTime) {
        this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
    }

    @Override
    public Date getStartTime() {
        return DateUtils.toDate(startTimestamp);
    }

    public void setStartTime(Date startTime) {
        this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
    }

    public Date getEndTime() {
        return DateUtils.toDate(endTimestamp);
    }

    public void setEndTime(Date endTime) {
        this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
    }

    private WorkflowInstance get(byte[] array) {
        LiteWorkflowInstance pInstance = WritableUtils.fromByteArray(array, LiteWorkflowInstance.class);
        return pInstance;
    }

    @SuppressWarnings("unchecked")
    public JSONObject toJSONObject() {
        return toJSONObject("GMT");
    }

    @SuppressWarnings("unchecked")
    public JSONObject toJSONObject(String timeZoneId) {
        JSONObject json = new JSONObject();
        json.put(JsonTags.WORKFLOW_APP_PATH, getAppPath());
        json.put(JsonTags.WORKFLOW_APP_NAME, getAppName());
        json.put(JsonTags.WORKFLOW_ID, getId());
        json.put(JsonTags.WORKFLOW_EXTERNAL_ID, getExternalId());
        json.put(JsonTags.WORKFLOW_PARENT_ID, getParentId());
        json.put(JsonTags.WORKFLOW_CONF, getConf());
        json.put(JsonTags.WORKFLOW_STATUS, getStatus().toString());
        json.put(JsonTags.WORKFLOW_LAST_MOD_TIME, JsonUtils.formatDateRfc822(getLastModifiedTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_CREATED_TIME, JsonUtils.formatDateRfc822(getCreatedTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_START_TIME, JsonUtils.formatDateRfc822(getStartTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_END_TIME, JsonUtils.formatDateRfc822(getEndTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_USER, getUser());
        json.put(JsonTags.WORKFLOW_GROUP, getGroup());
        json.put(JsonTags.WORKFLOW_ACL, getAcl());
        json.put(JsonTags.WORKFLOW_RUN, (long) getRun());
        json.put(JsonTags.WORKFLOW_CONSOLE_URL, getConsoleUrl());
        json.put(JsonTags.WORKFLOW_ACTIONS, WorkflowActionBean.toJSONArray(actions, timeZoneId));
        json.put(JsonTags.TO_STRING, toString());
        return json;
    }

    public String getAppPath() {
        return appPath;
    }

    public void setAppPath(String appPath) {
        this.appPath = appPath;
    }

    public String getAppName() {
        return appName;
    }

    public void setAppName(String appName) {
        this.appName = appName;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getConf() {
        return conf == null ? null : conf.getString();
    }

    public void setConf(String conf) {
        if (this.conf == null) {
            this.conf = new StringBlob(conf);
        }
        else {
            this.conf.setString(conf);
        }
    }

    public void setConfBlob(StringBlob conf) {
        this.conf = conf;
    }

    public StringBlob getConfBlob() {
        return this.conf;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getGroup() {
        return group;
    }

    @Override
    public String getAcl() {
        return getGroup();
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public int getRun() {
        return run;
    }

    public void setRun(int run) {
        this.run = run;
    }

    /**
     * Return the workflow job console URL.
     *
     * @return the workflow job console URL.
     */
    public String getConsoleUrl() {
        return consoleUrl;
    }

    /**
     * Return the corresponding Action ID, if any.
     *
     * @return the coordinator Action Id.
     */
    public String getParentId() {
        return parentId;
    }

    /**
     * Set coordinator action id
     *
     * @param parentId : coordinator action id
     */
    public void setParentId(String parentId) {
        this.parentId = parentId;
    }

    /**
     * Set the workflow job console URL.
     *
     * @param consoleUrl the workflow job console URL.
     */
    public void setConsoleUrl(String consoleUrl) {
        this.consoleUrl = consoleUrl;
    }

    @SuppressWarnings("unchecked")
    public List<WorkflowAction> getActions() {
        return (List) actions;
    }

    public void setActions(List<WorkflowActionBean> nodes) {
        this.actions = (nodes != null) ? nodes : new ArrayList<WorkflowActionBean>();
    }

    @Override
    public String toString() {
        return MessageFormat.format("Workflow id[{0}] status[{1}]", getId(), getStatus());
    }

    /**
     * Convert a workflows list into a JSONArray.
     *
     * @param workflows workflows list.
     * @param timeZoneId time zone to use for dates in the JSON array.
     * @return the corresponding JSON array.
     */
    @SuppressWarnings("unchecked")
    public static JSONArray toJSONArray(List<WorkflowJobBean> workflows, String timeZoneId) {
        JSONArray array = new JSONArray();
        if (workflows != null) {
            for (WorkflowJobBean node : workflows) {
                array.add(node.toJSONObject(timeZoneId));
            }
        }
        return array;
    }



}

If you are interested in the relational table schema used in Oozie, you can take a look at the OozieSchema in org.apache.oozie.store package which encapsulates the schema in java code

Oozie has the following DB tables
Process Instance Table, WorkflowJob Table, Actions Table, Version Table

      // Process Instance Table
        PI_wfId(OozieTable.WF_PROCESS_INSTANCE, String.class, true, 100),
        PI_state(OozieTable.WF_PROCESS_INSTANCE, Blob.class, false),

        // WorkflowJob Table
        WF_id(OozieTable.WORKFLOWS, String.class, true, 100),
        WF_externalId(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appPath(OozieTable.WORKFLOWS, String.class, false, 255),
        WF_conf(OozieTable.WORKFLOWS, String.class, false),
        WF_protoActionConf(OozieTable.WORKFLOWS, String.class, false),
        WF_logToken(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_status(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_run(OozieTable.WORKFLOWS, Long.class, false),
        WF_lastModTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_createdTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_startTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_endTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_userName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_groupName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_authToken(OozieTable.WORKFLOWS, String.class, false),

        // Actions Table
        ACTIONS_id(OozieTable.ACTIONS, String.class, true, 100),
        ACTIONS_name(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_type(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_wfId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_conf(OozieTable.ACTIONS, String.class, false),
        ACTIONS_status(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_externalStatus(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorCode(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorMessage(OozieTable.ACTIONS, String.class, false),
        ACTIONS_transition(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_retries(OozieTable.ACTIONS, Long.class, false),
        ACTIONS_startTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_endTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_lastCheckTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_data(OozieTable.ACTIONS, String.class, false),
        ACTIONS_externalId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_trackerUri(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_consoleUrl(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_executionPath(OozieTable.ACTIONS, String.class, false, 255),
        ACTIONS_pending(OozieTable.ACTIONS, Boolean.class, false),
        ACTIONS_pendingAge(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_signalValue(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_logToken(OozieTable.ACTIONS, String.class, false, 100),

        // Version Table
        VER_versionNumber(OozieTable.VERSION, String.class, false, 255);


OozieSchema which defines the DB tables used in Oozie.


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.oozie.store;

import java.sql.Blob;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.oozie.util.db.Schema;
import org.apache.oozie.util.db.Schema.Column;
import org.apache.oozie.util.db.Schema.DBType;
import org.apache.oozie.util.db.Schema.Index;
import org.apache.oozie.util.db.Schema.Table;

public class OozieSchema {

    private static String oozieDbName;

    private static final String OOZIE_VERSION = "0.1";

    public static final Map<Table, List<Column>> TABLE_COLUMNS = new HashMap<Table, List<Column>>();

    static {
        for (Column column : OozieColumn.values()) {
            List<Column> tColumns = TABLE_COLUMNS.get(column.table());
            if (tColumns == null) {
                tColumns = new ArrayList<Column>();
                TABLE_COLUMNS.put(column.table(), tColumns);
            }
            tColumns.add(column);
        }
    }

    public static void setOozieDbName(String dbName) {
        oozieDbName = dbName;
    }

    public static enum OozieTable implements Table {
        WORKFLOWS,
        ACTIONS,
        WF_PROCESS_INSTANCE,
        VERSION;

        @Override
        public String toString() {
            return oozieDbName + "." + name().toUpperCase();
        }
    }

    public static enum OozieIndex implements Index {
        IDX_WF_APPNAME(OozieColumn.WF_appName),
        IDX_WF_USER(OozieColumn.WF_userName),
        IDX_WF_GROUP(OozieColumn.WF_groupName),
        IDX_WF_STATUS(OozieColumn.WF_status),
        IDX_WF_EXTERNAL_ID(OozieColumn.WF_externalId),

        IDX_ACTIONS_BEGINTIME(OozieColumn.ACTIONS_pendingAge),
        IDX_ACTIONS_WFID(OozieColumn.ACTIONS_wfId);

        final Column column;

        OozieIndex(Column column) {
            this.column = column;
        }

        public Column column() {
            return column;
        }
    }

     public static enum OozieColumn implements Column {
        // Process Instance Table
        PI_wfId(OozieTable.WF_PROCESS_INSTANCE, String.class, true, 100),
        PI_state(OozieTable.WF_PROCESS_INSTANCE, Blob.class, false),

        // WorkflowJob Table
        WF_id(OozieTable.WORKFLOWS, String.class, true, 100),
        WF_externalId(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appPath(OozieTable.WORKFLOWS, String.class, false, 255),
        WF_conf(OozieTable.WORKFLOWS, String.class, false),
        WF_protoActionConf(OozieTable.WORKFLOWS, String.class, false),
        WF_logToken(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_status(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_run(OozieTable.WORKFLOWS, Long.class, false),
        WF_lastModTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_createdTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_startTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_endTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_userName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_groupName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_authToken(OozieTable.WORKFLOWS, String.class, false),

        // Actions Table
        ACTIONS_id(OozieTable.ACTIONS, String.class, true, 100),
        ACTIONS_name(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_type(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_wfId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_conf(OozieTable.ACTIONS, String.class, false),
        ACTIONS_status(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_externalStatus(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorCode(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorMessage(OozieTable.ACTIONS, String.class, false),
        ACTIONS_transition(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_retries(OozieTable.ACTIONS, Long.class, false),
        ACTIONS_startTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_endTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_lastCheckTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_data(OozieTable.ACTIONS, String.class, false),
        ACTIONS_externalId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_trackerUri(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_consoleUrl(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_executionPath(OozieTable.ACTIONS, String.class, false, 255),
        ACTIONS_pending(OozieTable.ACTIONS, Boolean.class, false),
        ACTIONS_pendingAge(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_signalValue(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_logToken(OozieTable.ACTIONS, String.class, false, 100),

        // Version Table
        VER_versionNumber(OozieTable.VERSION, String.class, false, 255);

        final Table table;
        final Class<?> type;
        int length = -1;
        final boolean isPrimaryKey;

        OozieColumn(Table table, Class<?> type, boolean isPrimaryKey) {
            this(table, type, isPrimaryKey, -1);
        }

        OozieColumn(Table table, Class<?> type, boolean isPrimaryKey, int length) {
            this.table = table;
            this.type = type;
            this.isPrimaryKey = isPrimaryKey;
            this.length = length;
        }

        private String getName() {
            String tName = table.name();
            return tName + "." + columnName();
        }

        public String columnName() {
            return name().split("_")[1].toLowerCase();
        }

        @Override
        public String toString() {
            return getName();
        }

        public Table table() {
            return table;
        }

        public Class<?> getType() {
            return type;
        }

        public int getLength() {
            return length;
        }

        public String asLabel() {
            return name().toUpperCase();
        }

        public boolean isPrimaryKey() {
            return isPrimaryKey;
        }
    }

    /**
     * Generates the create table SQL Statement
     *
     * @param table
     * @param dbType
     * @return SQL Statement to create the table
     */
    public static String generateCreateTableScript(Table table, DBType dbType) {
        return Schema.generateCreateTableScript(table, dbType, TABLE_COLUMNS.get(table));
    }

    /**
     * Gets the query that will be used to validate the connection
     *
     * @param dbName
     * @return
     */
    public static String getValidationQuery(String dbName) {
        return "select count(" + OozieColumn.VER_versionNumber.columnName() + ") from " + dbName + "."
                + OozieTable.VERSION.name().toUpperCase();
    }

    /**
     * Generates the Insert statement to insert the OOZIE_VERSION to table
     *
     * @param dbName
     * @return
     */
    public static String generateInsertVersionScript(String dbName) {
        return "INSERT INTO " + dbName + "." + OozieTable.VERSION.name().toUpperCase() + "("
                + OozieColumn.VER_versionNumber.columnName() + ") VALUES(" + OOZIE_VERSION + ")";
    }

    /**
     * Gets the Oozie Schema Version
     *
     * @return
     */
    public static String getOozieVersion() {
        return OOZIE_VERSION;
    }
}

In Oozie, there are different DB stores, eg. Coord Store, Workflow Store, SLA Store
They are in the package org.apache.oozie.store

Store is the abstract class to separate Entities from Actual store implementation
It allows one to get the EntityManager, getConnection, perform transaction control and rollback

The DB implementations are
CoordinatorStore, DB Implementation of Coord Store
WorkflowStore, DB Implementation of Workflow Store


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.oozie.store;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import javax.persistence.EntityManager;
import javax.persistence.Query;

import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.WorkflowsInfo;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob.Status;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.SchemaService.SchemaName;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.openjpa.persistence.OpenJPAEntityManager;
import org.apache.openjpa.persistence.OpenJPAPersistence;
import org.apache.openjpa.persistence.OpenJPAQuery;
import org.apache.openjpa.persistence.jdbc.FetchDirection;
import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
import org.apache.openjpa.persistence.jdbc.ResultSetType;

/**
 * DB Implementation of Workflow Store
 */
public class WorkflowStore extends Store {
    private Connection conn;
    private EntityManager entityManager;
    private boolean selectForUpdate;
    private static final String INSTR_GROUP = "db";
    public static final int LOCK_TIMEOUT = 50000;
    private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
    private static final String countStr = "Select count(w) from WorkflowJobBean w";

    public WorkflowStore() {
    }

    public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException {
        super();
        conn = ParamChecker.notNull(connection, "conn");
        entityManager = getEntityManager();
        this.selectForUpdate = selectForUpdate;
    }

    public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException {
        super(store);
        conn = ParamChecker.notNull(connection, "conn");
        entityManager = getEntityManager();
        this.selectForUpdate = selectForUpdate;
    }

    public WorkflowStore(boolean selectForUpdate) throws StoreException {
        super();
        entityManager = getEntityManager();
        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
        OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager);
        conn = (Connection) kem.getConnection();
        this.selectForUpdate = selectForUpdate;
    }

    public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException {
        super(store);
        entityManager = getEntityManager();
        this.selectForUpdate = selectForUpdate;
    }

    /**
     * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job.
     *
     * @param workflow workflow bean
     * @throws StoreException
     */

    public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException {
        ParamChecker.notNull(workflow, "workflow");

        doOperation("insertWorkflow", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                entityManager.persist(workflow);
                return null;
            }
        });
    }

    /**
     * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow
     * depending on the locking parameter.
     *
     * @param id Workflow ID
     * @param locking true if Workflow is to be locked
     * @return WorkflowJobBean
     * @throws StoreException
     */
    public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException {
        ParamChecker.notEmpty(id, "WorkflowID");
        WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() {
            public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException {
                WorkflowJobBean wfBean = null;
                wfBean = getWorkflowOnly(id, locking);
                if (wfBean == null) {
                    throw new StoreException(ErrorCode.E0604, id);
                }
                /*
                 * WorkflowInstance wfInstance; //krishna and next line
                 * wfInstance = workflowLib.get(id); wfInstance =
                 * wfBean.get(wfBean.getWfInstance());
                 * wfBean.setWorkflowInstance(wfInstance);
                 * wfBean.setWfInstance(wfInstance);
                 */
                return wfBean;
            }
        });
        return wfBean;
    }

    /**
     * Get the number of Workflows with the given status.
     *
     * @param status Workflow Status.
     * @return number of Workflows with given status.
     * @throws StoreException
     */
    public int getWorkflowCountWithStatus(final String status) throws StoreException {
        ParamChecker.notEmpty(status, "status");
        Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() {
            public Integer call() throws SQLException {
                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS");
                q.setParameter("status", status);
                Long count = (Long) q.getSingleResult();
                return Integer.valueOf(count.intValue());
            }
        });
        return cnt.intValue();
    }

    /**
     * Get the number of Workflows with the given status which was modified in given time limit.
     *
     * @param status Workflow Status.
     * @param secs No. of seconds within which the workflow got modified.
     * @return number of Workflows modified within given time with given status.
     * @throws StoreException
     */
    public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException {
        ParamChecker.notEmpty(status, "status");
        ParamChecker.notEmpty(status, "secs");
        Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() {
            public Integer call() throws SQLException {
                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS");
                Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000));
                q.setParameter("status", status);
                q.setParameter("lastModTime", ts);
                Long count = (Long) q.getSingleResult();
                return Integer.valueOf(count.intValue());
            }
        });
        return cnt.intValue();
    }

    /**
     * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated
     *
     * @param wfBean Workflow Bean
     * @throws StoreException If Workflow doesn't exist
     */
    public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
        ParamChecker.notNull(wfBean, "WorkflowJobBean");
        doOperation("updateWorkflow", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
                WorkflowJobQueryExecutor.getInstance().executeUpdate(
                        WorkflowJobQueryExecutor.WorkflowJobQuery.UPDATE_WORKFLOW, wfBean);
                return null;
            }
        });
    }

    /**
     * Create a new Action record in the ACTIONS table with the given Bean.
     *
     * @param action WorkflowActionBean
     * @throws StoreException If the action is already present
     */
    public void insertAction(final WorkflowActionBean action) throws StoreException {
        ParamChecker.notNull(action, "WorkflowActionBean");
        doOperation("insertAction", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                entityManager.persist(action);
                return null;
            }
        });
    }

    /**
     * Load the action data and returns a bean.
     *
     * @param id Action Id
     * @param locking true if the action is to be locked
     * @return Action Bean
     * @throws StoreException If action doesn't exist
     */
    public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException {
        ParamChecker.notEmpty(id, "ActionID");
        WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() {
            public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException,
                    InterruptedException {
                Query q = entityManager.createNamedQuery("GET_ACTION");
                /*
                 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
                 * FetchPlan fetch = oq.getFetchPlan();
                 * fetch.setReadLockMode(LockModeType.WRITE);
                 * fetch.setLockTimeout(1000); // 1 seconds }
                 */
                WorkflowActionBean action = null;
                q.setParameter("id", id);
                List<WorkflowActionBean> actions = q.getResultList();
                // action = (WorkflowActionBean) q.getSingleResult();
                if (actions.size() > 0) {
                    action = actions.get(0);
                }
                else {
                    throw new StoreException(ErrorCode.E0605, id);
                }

                /*
                 * if (locking) return action; else
                 */
                // return action;
                return getBeanForRunningAction(action);
            }
        });
        return action;
    }

    /**
     * Update the given action bean to DB.
     *
     * @param action Action Bean
     * @throws StoreException if action doesn't exist
     */
    public void updateAction(final WorkflowActionBean action) throws StoreException {
        ParamChecker.notNull(action, "WorkflowActionBean");
        doOperation("updateAction", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
                WorkflowActionQueryExecutor.getInstance().executeUpdate(
                        WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action);
                return null;
            }
        });
    }

    /**
     * Delete the Action with given id.
     *
     * @param id Action ID
     * @throws StoreException if Action doesn't exist
     */
    public void deleteAction(final String id) throws StoreException {
        ParamChecker.notEmpty(id, "ActionID");
        doOperation("deleteAction", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                /*
                 * Query q = entityManager.createNamedQuery("DELETE_ACTION");
                 * q.setParameter("id", id); q.executeUpdate();
                 */
                WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id);
                if (action != null) {
                    entityManager.remove(action);
                }
                return null;
            }
        });
    }

    /**
     * Loads all the actions for the given Workflow. Also locks all the actions if locking is true.
     *
     * @param wfId Workflow ID
     * @param locking true if Actions are to be locked
     * @return A List of WorkflowActionBean
     * @throws StoreException
     */
    public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking)
            throws StoreException {
        ParamChecker.notEmpty(wfId, "WorkflowID");
        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
                                                       new Callable<List<WorkflowActionBean>>() {
                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
                                                                   InterruptedException {
                                                               List<WorkflowActionBean> actions;
                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
                                                               try {
                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");

                                                                   /*
                                                                   * OpenJPAQuery oq = OpenJPAPersistence.cast(q);
                                                                   * if (locking) { //
                                                                   * q.setHint("openjpa.FetchPlan.ReadLockMode"
                                                                   * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan();
                                                                   * fetch.setReadLockMode(LockModeType.WRITE);
                                                                   * fetch.setLockTimeout(1000); // 1 seconds }
                                                                   */
                                                                   q.setParameter("wfId", wfId);
                                                                   actions = q.getResultList();
                                                                   for (WorkflowActionBean a : actions) {
                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
                                                                       actionList.add(aa);
                                                                   }
                                                               }
                                                               catch (IllegalStateException e) {
                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                                                               }
                                                               /*
                                                               * if (locking) { return actions; } else {
                                                               */
                                                               return actionList;
                                                               // }
                                                           }
                                                       });
        return actions;
    }

    /**
     * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true.
     *
     * @param wfId Workflow ID
     * @param start offset for select statement
     * @param len number of Workflow Actions to be returned
     * @param locking true if Actions are to be locked
     * @return A List of WorkflowActionBean
     * @throws StoreException
     */
    public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len)
            throws StoreException {
        ParamChecker.notEmpty(wfId, "WorkflowID");
        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
                                                       new Callable<List<WorkflowActionBean>>() {
                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
                                                                   InterruptedException {
                                                               List<WorkflowActionBean> actions;
                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
                                                               try {
                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
                                                                   OpenJPAQuery oq = OpenJPAPersistence.cast(q);
                                                                   q.setParameter("wfId", wfId);
                                                                   q.setFirstResult(start - 1);
                                                                   q.setMaxResults(len);
                                                                   actions = q.getResultList();
                                                                   for (WorkflowActionBean a : actions) {
                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
                                                                       actionList.add(aa);
                                                                   }
                                                               }
                                                               catch (IllegalStateException e) {
                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                                                               }
                                                               return actionList;
                                                           }
                                                       });
        return actions;
    }

    /**
     * Load All the actions that are pending for more than given time.
     *
     * @param minimumPendingAgeSecs Minimum Pending age in seconds
     * @return List of action beans
     * @throws StoreException
     */
    public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException {
        List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() {
            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
                Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
                List<WorkflowActionBean> actionList = null;
                try {
                    Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS");
                    q.setParameter("pendingAge", ts);
                    actionList = q.getResultList();
                }
                catch (IllegalStateException e) {
                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                }
                return actionList;
            }
        });
        return actions;
    }

    /**
     * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs
     *
     * @param checkAgeSecs check age in seconds.
     * @return List of action beans.
     * @throws StoreException
     */
    public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException {
        List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() {

            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
                List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>();
                Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
                try {
                    Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS");
                    q.setParameter("lastCheckTime", ts);
                    actions = q.getResultList();
                }
                catch (IllegalStateException e) {
                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                }

                return actions;
            }
        });
        return actions;
    }

    /**
     * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL.
     *
     * @param wfId String
     * @return List of action beans
     * @throws StoreException
     */
    public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException {
        List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS",
                new Callable<List<WorkflowActionBean>>() {
                    public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
                        List<WorkflowActionBean> actionList = null;
                        try {
                            Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS");
                            q.setParameter("wfId", wfId);
                            actionList = q.getResultList();
                        }
                        catch (IllegalStateException e) {
                            throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                        }

                        return actionList;
                    }
                });
        return actions;
    }

    /**
     * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group,
     * appName, status.
     *
     * @param filter Filter condition
     * @param start offset for select statement
     * @param len number of Workflows to be returned
     * @return A list of workflows
     * @throws StoreException
     */
    public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len)
            throws StoreException {

        WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() {
            @SuppressWarnings("unchecked")
            public WorkflowsInfo call() throws SQLException, StoreException {

                List<String> orArray = new ArrayList<String>();
                List<String> colArray = new ArrayList<String>();
                List<String> valArray = new ArrayList<String>();
                StringBuilder sb = new StringBuilder("");
                boolean isStatus = false;
                boolean isGroup = false;
                boolean isAppName = false;
                boolean isUser = false;
                boolean isEnabled = false;
                int index = 0;
                for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
                    String colName = null;
                    String colVar = null;
                    if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
                        List<String> values = filter.get(OozieClient.FILTER_GROUP);
                        colName = "group";
                        for (int i = 0; i < values.size(); i++) {
                            colVar = "group";
                            colVar = colVar + index;
                            if (!isEnabled && !isGroup) {
                                sb.append(seletStr).append(" where w.group IN (:group" + index);
                                isGroup = true;
                                isEnabled = true;
                            }
                            else {
                                if (isEnabled && !isGroup) {
                                    sb.append(" and w.group IN (:group" + index);
                                    isGroup = true;
                                }
                                else {
                                    if (isGroup) {
                                        sb.append(", :group" + index);
                                    }
                                }
                            }
                            if (i == values.size() - 1) {
                                sb.append(")");
                            }
                            index++;
                            valArray.add(values.get(i));
                            orArray.add(colName);
                            colArray.add(colVar);
                        }
                    }
                    else {
                        if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
                            List<String> values = filter.get(OozieClient.FILTER_STATUS);
                            colName = "status";
                            for (int i = 0; i < values.size(); i++) {
                                colVar = "status";
                                colVar = colVar + index;
                                if (!isEnabled && !isStatus) {
                                    sb.append(seletStr).append(" where w.statusStr IN (:status" + index);
                                    isStatus = true;
                                    isEnabled = true;
                                }
                                else {
                                    if (isEnabled && !isStatus) {
                                        sb.append(" and w.statusStr IN (:status" + index);
                                        isStatus = true;
                                    }
                                    else {
                                        if (isStatus) {
                                            sb.append(", :status" + index);
                                        }
                                    }
                                }
                                if (i == values.size() - 1) {
                                    sb.append(")");
                                }
                                index++;
                                valArray.add(values.get(i));
                                orArray.add(colName);
                                colArray.add(colVar);
                            }
                        }
                        else {
                            if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
                                List<String> values = filter.get(OozieClient.FILTER_NAME);
                                colName = "appName";
                                for (int i = 0; i < values.size(); i++) {
                                    colVar = "appName";
                                    colVar = colVar + index;
                                    if (!isEnabled && !isAppName) {
                                        sb.append(seletStr).append(" where w.appName IN (:appName" + index);
                                        isAppName = true;
                                        isEnabled = true;
                                    }
                                    else {
                                        if (isEnabled && !isAppName) {
                                            sb.append(" and w.appName IN (:appName" + index);
                                            isAppName = true;
                                        }
                                        else {
                                            if (isAppName) {
                                                sb.append(", :appName" + index);
                                            }
                                        }
                                    }
                                    if (i == values.size() - 1) {
                                        sb.append(")");
                                    }
                                    index++;
                                    valArray.add(values.get(i));
                                    orArray.add(colName);
                                    colArray.add(colVar);
                                }
                            }
                            else {
                                if (entry.getKey().equals(OozieClient.FILTER_USER)) {
                                    List<String> values = filter.get(OozieClient.FILTER_USER);
                                    colName = "user";
                                    for (int i = 0; i < values.size(); i++) {
                                        colVar = "user";
                                        colVar = colVar + index;
                                        if (!isEnabled && !isUser) {
                                            sb.append(seletStr).append(" where w.user IN (:user" + index);
                                            isUser = true;
                                            isEnabled = true;
                                        }
                                        else {
                                            if (isEnabled && !isUser) {
                                                sb.append(" and w.user IN (:user" + index);
                                                isUser = true;
                                            }
                                            else {
                                                if (isUser) {
                                                    sb.append(", :user" + index);
                                                }
                                            }
                                        }
                                        if (i == values.size() - 1) {
                                            sb.append(")");
                                        }
                                        index++;
                                        valArray.add(values.get(i));
                                        orArray.add(colName);
                                        colArray.add(colVar);
                                    }
                                }
                            }
                        }
                    }
                }

                int realLen = 0;

                Query q = null;
                Query qTotal = null;
                if (orArray.size() == 0) {
                    q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS");
                    q.setFirstResult(start - 1);
                    q.setMaxResults(len);
                    qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT");
                }
                else {
                    if (orArray.size() > 0) {
                        StringBuilder sbTotal = new StringBuilder(sb);
                        sb.append(" order by w.startTimestamp desc ");
                        XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
                        q = entityManager.createQuery(sb.toString());
                        q.setFirstResult(start - 1);
                        q.setMaxResults(len);
                        qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr));
                        for (int i = 0; i < orArray.size(); i++) {
                            q.setParameter(colArray.get(i), valArray.get(i));
                            qTotal.setParameter(colArray.get(i), valArray.get(i));
                        }
                    }
                }

                OpenJPAQuery kq = OpenJPAPersistence.cast(q);
                JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
                fetch.setFetchBatchSize(20);
                fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
                fetch.setFetchDirection(FetchDirection.FORWARD);
                fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
                List<?> resultList = q.getResultList();
                List<Object[]> objectArrList = (List<Object[]>) resultList;
                List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();

                for (Object[] arr : objectArrList) {
                    WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
                    wfBeansList.add(ww);
                }

                realLen = ((Long) qTotal.getSingleResult()).intValue();

                return new WorkflowsInfo(wfBeansList, start, len, realLen);
            }
        });
        return workFlowsInfo;

    }

    /**
     * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded
     *
     * @param id Workflow Id
     * @return Workflow Bean
     * @throws StoreException If Workflow doesn't exist
     */
    public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException {
        ParamChecker.notEmpty(id, "WorkflowID");
        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
                WorkflowJobBean wfBean = null;
                wfBean = getWorkflowforInfo(id, false);
                if (wfBean == null) {
                    throw new StoreException(ErrorCode.E0604, id);
                }
                else {
                    wfBean.setActions(getActionsForWorkflow(id, false));
                }
                return wfBean;
            }
        });
        return wfBean;
    }

    /**
     * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded
     *
     * @param id Workflow Id
     * @param start offset for select statement for actions
     * @param len number of Workflow Actions to be returned
     * @return Workflow Bean
     * @throws StoreException If Workflow doesn't exist
     */
    public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException {
        ParamChecker.notEmpty(id, "WorkflowID");
        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
                WorkflowJobBean wfBean = null;
                wfBean = getWorkflowforInfo(id, false);
                if (wfBean == null) {
                    throw new StoreException(ErrorCode.E0604, id);
                }
                else {
                    wfBean.setActions(getActionsSubsetForWorkflow(id, start, len));
                }
                return wfBean;
            }
        });
        return wfBean;
    }

    /**
     * Get the Workflow ID with given external ID which will be assigned for the subworkflows.
     *
     * @param externalId external ID
     * @return Workflow ID
     * @throws StoreException if there is no job with external ID
     */
    public String getWorkflowIdForExternalId(final String externalId) throws StoreException {
        ParamChecker.notEmpty(externalId, "externalId");
        String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() {
            public String call() throws SQLException, StoreException {
                String id = "";
                Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID");
                q.setParameter("externalId", externalId);
                List<String> w = q.getResultList();
                if (w.size() == 0) {
                    id = "";
                }
                else {
                    int index = w.size() - 1;
                    id = w.get(index);
                }
                return id;
            }
        });
        return wfId;
    }

    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;

    /**
     * Purge the Workflows Completed older than given days.
     *
     * @param olderThanDays number of days for which to preserve the workflows
     * @throws StoreException
     */
    public void purge(final long olderThanDays, final int limit) throws StoreException {
        doOperation("purge", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
                Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN");
                q.setParameter("endTime", maxEndTime);
                q.setMaxResults(limit);
                List<WorkflowJobBean> workflows = q.getResultList();
                int actionDeleted = 0;
                if (workflows.size() != 0) {
                    for (WorkflowJobBean w : workflows) {
                        String wfId = w.getId();
                        entityManager.remove(w);
                        Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
                        g.setParameter("wfId", wfId);
                        actionDeleted += g.executeUpdate();
                    }
                }
                XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted);
                return null;
            }
        });
    }

    private <V> V doOperation(String name, Callable<V> command) throws StoreException {
        try {
            Instrumentation.Cron cron = new Instrumentation.Cron();
            cron.start();
            V retVal;
            try {
                retVal = command.call();
            }
            finally {
                cron.stop();
            }
            Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
            return retVal;
        }
        catch (StoreException ex) {
            throw ex;
        }
        catch (SQLException ex) {
            throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
        }
        catch (Exception e) {
            throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
        }
    }

    private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException,
            InterruptedException, StoreException {
        WorkflowJobBean wfBean = null;
        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
        /*
         * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ");
         * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch =
         * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE);
         * fetch.setLockTimeout(-1); // unlimited }
         */
        q.setParameter("id", id);
        List<WorkflowJobBean> w = q.getResultList();
        if (w.size() > 0) {
            wfBean = w.get(0);
        }
        return wfBean;
        // return getBeanForRunningWorkflow(wfBean);
    }

    private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException,
            InterruptedException, StoreException {
        WorkflowJobBean wfBean = null;
        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
        q.setParameter("id", id);
        List<WorkflowJobBean> w = q.getResultList();
        if (w.size() > 0) {
            wfBean = w.get(0);
            return getBeanForRunningWorkflow(wfBean);
        }
        return null;
    }

    private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException {
        WorkflowJobBean wfBean = new WorkflowJobBean();
        wfBean.setId(w.getId());
        wfBean.setAppName(w.getAppName());
        wfBean.setAppPath(w.getAppPath());
        wfBean.setConfBlob(w.getConfBlob());
        wfBean.setGroup(w.getGroup());
        wfBean.setRun(w.getRun());
        wfBean.setUser(w.getUser());
        wfBean.setCreatedTime(w.getCreatedTime());
        wfBean.setEndTime(w.getEndTime());
        wfBean.setExternalId(w.getExternalId());
        wfBean.setLastModifiedTime(w.getLastModifiedTime());
        wfBean.setLogToken(w.getLogToken());
        wfBean.setProtoActionConfBlob(w.getProtoActionConfBlob());
        wfBean.setSlaXmlBlob(w.getSlaXmlBlob());
        wfBean.setStartTime(w.getStartTime());
        wfBean.setStatus(w.getStatus());
        wfBean.setWfInstanceBlob(w.getWfInstanceBlob());
        return wfBean;
    }

    private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {

        WorkflowJobBean wfBean = new WorkflowJobBean();
        wfBean.setId((String) arr[0]);
        if (arr[1] != null) {
            wfBean.setAppName((String) arr[1]);
        }
        if (arr[2] != null) {
            wfBean.setStatus(Status.valueOf((String) arr[2]));
        }
        if (arr[3] != null) {
            wfBean.setRun((Integer) arr[3]);
        }
        if (arr[4] != null) {
            wfBean.setUser((String) arr[4]);
        }
        if (arr[5] != null) {
            wfBean.setGroup((String) arr[5]);
        }
        if (arr[6] != null) {
            wfBean.setCreatedTime((Timestamp) arr[6]);
        }
        if (arr[7] != null) {
            wfBean.setStartTime((Timestamp) arr[7]);
        }
        if (arr[8] != null) {
            wfBean.setLastModifiedTime((Timestamp) arr[8]);
        }
        if (arr[9] != null) {
            wfBean.setEndTime((Timestamp) arr[9]);
        }
        return wfBean;
    }

    private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException {
        if (a != null) {
            WorkflowActionBean action = new WorkflowActionBean();
            action.setId(a.getId());
            action.setConfBlob(a.getConfBlob());
            action.setConsoleUrl(a.getConsoleUrl());
            action.setDataBlob(a.getDataBlob());
            action.setStatsBlob(a.getStatsBlob());
            action.setExternalChildIDsBlob(a.getExternalChildIDsBlob());
            action.setErrorInfo(a.getErrorCode(), a.getErrorMessage());
            action.setExternalId(a.getExternalId());
            action.setExternalStatus(a.getExternalStatus());
            action.setName(a.getName());
            action.setCred(a.getCred());
            action.setRetries(a.getRetries());
            action.setTrackerUri(a.getTrackerUri());
            action.setTransition(a.getTransition());
            action.setType(a.getType());
            action.setEndTime(a.getEndTime());
            action.setExecutionPath(a.getExecutionPath());
            action.setLastCheckTime(a.getLastCheckTime());
            action.setLogToken(a.getLogToken());
            if (a.isPending() == true) {
                action.setPending();
            }
            action.setPendingAge(a.getPendingAge());
            action.setSignalValue(a.getSignalValue());
            action.setSlaXmlBlob(a.getSlaXmlBlob());
            action.setStartTime(a.getStartTime());
            action.setStatus(a.getStatus());
            action.setJobId(a.getWfId());
            action.setUserRetryCount(a.getUserRetryCount());
            action.setUserRetryInterval(a.getUserRetryInterval());
            action.setUserRetryMax(a.getUserRetryMax());
            return action;
        }
        return null;
    }
}

In my next blog post, I will talk more about the MapperLauncher, DAGEngine, and etc. Stay tuned.

YARN RPC Part II

YarnRPC is the abstract base class that defines several abstract methods to return RPC client proxy and RPC server. HadoopYarnProtoRPC extends YarnRPC to provide concrete implementations to the defined abstract methods.


public abstract class YarnRPC {

 public abstract Object getProxy(Class protocol, InetSocketAddress addr,
      Configuration conf);

  public abstract void stopProxy(Object proxy, Configuration conf);

  public abstract Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers, String portRangeConfig);

  public Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers) {
    return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
        null);
  }

}

Inside HadoopYarnProtoRPC, it uses RPCFactoryProvider which acts as a Singleton Factory to return different implementations of RPCServerFactory and RPCClientFactory interfaces.

Currently, we have RPCServerFactoryPBImpl which implements RPCServerFactory interface and RPCClientFactoryPBImpl which implements RPCClientFactory interface in YARN. These PB factories in turn allows us to inject different Protocol Buffer protocol implementations based on the protocol class in the creation of RPC server and client proxy by the underlying Hadoop RPC.

RpcClientAndServerFactory

Some examples of Protocol Buffer protocol implementations are

org.apache.hadoop.yarn.api.impl.pb.service
AMRMProtocolPBServiceImpl (will be injected if the protocol is AMRMProtocol)
ClientRMProtocolPBServiceImpl (will be injected if the protocol is ClientRMProtocol)

org.apache.hadoop.yarn.api.impl.pb.client
AMRMProtocolPBClientImpl
ClientRMProtocolClientImp


/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.factories.impl.pb;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.factories.RpcServerFactory;

import com.google.protobuf.BlockingService;

public class RpcServerFactoryPBImpl implements RpcServerFactory {

  private static final Log LOG = LogFactory.getLog(RpcServerFactoryPBImpl.class);
  private static final String PROTO_GEN_PACKAGE_NAME = "org.apache.hadoop.yarn.proto";
  private static final String PROTO_GEN_CLASS_SUFFIX = "Service";
  private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.service";
  private static final String PB_IMPL_CLASS_SUFFIX = "PBServiceImpl";
  
  private static final RpcServerFactoryPBImpl self = new RpcServerFactoryPBImpl();

  private Configuration localConf = new Configuration();
  private ConcurrentMap<Class<?>, Constructor<?>> serviceCache = new ConcurrentHashMap<Class<?>, Constructor<?>>();
  private ConcurrentMap<Class<?>, Method> protoCache = new ConcurrentHashMap<Class<?>, Method>();
  
  public static RpcServerFactoryPBImpl get() {
    return RpcServerFactoryPBImpl.self;
  }
  

  private RpcServerFactoryPBImpl() {
  }
  
  public Server getServer(Class<?> protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers)
      throws YarnException {
    return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
        null);
  }
  
  @Override
  public Server getServer(Class<?> protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
      String portRangeConfig)
      throws YarnException {
    
    Constructor<?> constructor = serviceCache.get(protocol);
    if (constructor == null) {
      Class<?> pbServiceImplClazz = null;
      try {
        pbServiceImplClazz = localConf
            .getClassByName(getPbServiceImplClassName(protocol));
      } catch (ClassNotFoundException e) {
        throw new YarnException("Failed to load class: ["
            + getPbServiceImplClassName(protocol) + "]", e);
      }
      try {
        constructor = pbServiceImplClazz.getConstructor(protocol);
        constructor.setAccessible(true);
        serviceCache.putIfAbsent(protocol, constructor);
      } catch (NoSuchMethodException e) {
        throw new YarnException("Could not find constructor with params: "
            + Long.TYPE + ", " + InetSocketAddress.class + ", "
            + Configuration.class, e);
      }
    }
    
    Object service = null;
    try {
      service = constructor.newInstance(instance);
    } catch (InvocationTargetException e) {
      throw new YarnException(e);
    } catch (IllegalAccessException e) {
      throw new YarnException(e);
    } catch (InstantiationException e) {
      throw new YarnException(e);
    }

    Class<?> pbProtocol = service.getClass().getInterfaces()[0];
    Method method = protoCache.get(protocol);
    if (method == null) {
      Class<?> protoClazz = null;
      try {
        protoClazz = localConf.getClassByName(getProtoClassName(protocol));
      } catch (ClassNotFoundException e) {
        throw new YarnException("Failed to load class: ["
            + getProtoClassName(protocol) + "]", e);
      }
      try {
        method = protoClazz.getMethod("newReflectiveBlockingService",
            pbProtocol.getInterfaces()[0]);
        method.setAccessible(true);
        protoCache.putIfAbsent(protocol, method);
      } catch (NoSuchMethodException e) {
        throw new YarnException(e);
      }
    }
    
    try {
      return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
          (BlockingService)method.invoke(null, service), portRangeConfig);
    } catch (InvocationTargetException e) {
      throw new YarnException(e);
    } catch (IllegalAccessException e) {
      throw new YarnException(e);
    } catch (IOException e) {
      throw new YarnException(e);
    }
  }
  
  private String getProtoClassName(Class<?> clazz) {
    String srcClassName = getClassName(clazz);
    return PROTO_GEN_PACKAGE_NAME + "." + srcClassName + "$" + srcClassName + PROTO_GEN_CLASS_SUFFIX;  
  }
  
  private String getPbServiceImplClassName(Class<?> clazz) {
    String srcPackagePart = getPackageName(clazz);
    String srcClassName = getClassName(clazz);
    String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
    String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
    return destPackagePart + "." + destClassPart;
  }
  
  private String getClassName(Class<?> clazz) {
    String fqName = clazz.getName();
    return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
  }
  
  private String getPackageName(Class<?> clazz) {
    return clazz.getPackage().getName();
  }

  private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, 
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
      BlockingService blockingService, String portRangeConfig) throws IOException {
    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
    RPC.Server server = RPC.getServer(pbProtocol, blockingService, 
        addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
        secretManager, portRangeConfig);
    LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
    return server;
  }
}

Events in MapReduce V2

In the last post, we looked into the Yarn event structure. Today, we will look at different events introduced in MapReduce V2. These events are defined in the org.apache.hadoop.mapreduce.v2.app.job.event package.

AllEventTypes

Image

You will find different event types defined in the JobEventType, TaskEventType, TaskAttemptEventType enum classes.


/**
 * Event types handled by Job.
 */
public enum JobEventType {

  //Producer:Client
  JOB_KILL,

  //Producer:MRAppMaster
  JOB_INIT,
  JOB_START,

  //Producer:Task
  JOB_TASK_COMPLETED,
  JOB_MAP_TASK_RESCHEDULED,
  JOB_TASK_ATTEMPT_COMPLETED,

  //Producer:CommitterEventHandler
  JOB_SETUP_COMPLETED,
  JOB_SETUP_FAILED,
  JOB_COMMIT_COMPLETED,
  JOB_COMMIT_FAILED,
  JOB_ABORT_COMPLETED,

  //Producer:Job
  JOB_COMPLETED,

  //Producer:Any component
  JOB_DIAGNOSTIC_UPDATE,
  INTERNAL_ERROR,
  JOB_COUNTER_UPDATE,

  //Producer:TaskAttemptListener
  JOB_TASK_ATTEMPT_FETCH_FAILURE,

  //Producer:RMContainerAllocator
  JOB_UPDATED_NODES

}