Managing Big Data Pipeline using Oozie

If your team is building a big data pipeline to ingest data all the way to data processing/analytics, it is crucial to use something like Oozie to manage the entire workflow. Without a workflow management, it is difficult to manage and monitor the data pipeline. It will be prone to error and mismanagement. Without Oozie, you will most likely fall back to use shell scripting and cron to trigger, run and monitor each individual step, eg Hive query, MapReduce job, pre and post validation steps etc. Writing/maintaining these scripts is a cumbersome task. Soon enough, if you are not careful, the data pipeline turns against you and becomes this unmanageable and unwieldy monster. I have recently overhauled a complex data pipeline and also built new ones to use Oozie as the workflow manager and the difference is like day and night. Not only it allows us to piece together and coordinate different processing steps easily, it also makes the flow super easy to explain to others (eg. Developer, QA, Product Owner). The built in email notification action allows us to detect error every step along the pipeline and notify the appropriate team in charge. In addition, it is easy to parameterize various arguments passed to your job.
You can even build pre and post validation actions to perform simple sanity check of your input and output, eg. making sure your Hive output table or output files is not empty to a full blown abnormality detection pipeline.

In summary, workflow management tool like Oozie allows

1) Easy orchestration and coordination of complex data pipeline consists of hive query, java map reduce job, pig, sqoop and shell scripts.
2) Easy configurability of various processing steps. Eg. Use job.properties to configure and parameterize arguments.
3) Monitoring of complex pipeline. Trigger email notification if error do happen.

Oozie is not without any pain point but it is an essential tool for building and managing a robust data pipeline.

Oozie is a web application built on top of Apache Tomcat Catalina. Take a look at its web.xml and you can find the servlet listener called org.apache.oozie.servlet.ServicesLoader that initializes the Oozie’s services.


<listener>
<listener-class>org.apache.oozie.servlet.ServicesLoader</listener-class>
</listener>


<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
  distributed with this work for additional information
  regarding copyright ownership.  The ASF licenses this file
  to you under the Apache License, Version 2.0 (the
  "License"); you may not use this file except in compliance
  with the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->
<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd" [
    <!ENTITY web-common SYSTEM "web-common.xml">
]>

<web-app>

    <!--
      ========================================================================
      IMPORTANT: ANY CHANGES TO THE SERVLETS, SERVLET MAPPINGS, LISTENERS, ETC
                 MUST BE REFLECTED IN distro/src/main/tomcat/ssl-web.xml
                 AS WELL.
      ========================================================================
    -->

    <display-name>OOZIE</display-name>

    <!-- Listeners -->
    <listener>
        <listener-class>org.apache.oozie.servlet.ServicesLoader</listener-class>
    </listener>

    <!-- Servlets -->
    <servlet>
        <servlet-name>versions</servlet-name>
        <display-name>WS API for Workflow Instances</display-name>
        <servlet-class>org.apache.oozie.servlet.VersionServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>v0admin</servlet-name>
        <display-name>Oozie admin</display-name>
        <servlet-class>org.apache.oozie.servlet.V0AdminServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>v1admin</servlet-name>
        <display-name>Oozie admin</display-name>
        <servlet-class>org.apache.oozie.servlet.V1AdminServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>v2admin</servlet-name>
        <display-name>Oozie admin</display-name>
        <servlet-class>org.apache.oozie.servlet.V2AdminServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>callback</servlet-name>
        <display-name>Callback Notification</display-name>
        <servlet-class>org.apache.oozie.servlet.CallbackServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>v0jobs</servlet-name>
        <display-name>WS API for Workflow Jobs</display-name>
        <servlet-class>org.apache.oozie.servlet.V0JobsServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>v1jobs</servlet-name>
        <display-name>WS API for Workflow Jobs</display-name>
        <servlet-class>org.apache.oozie.servlet.V1JobsServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>v0job</servlet-name>
        <display-name>WS API for a specific Workflow Job</display-name>
        <servlet-class>org.apache.oozie.servlet.V0JobServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>v1job</servlet-name>
        <display-name>WS API for a specific Workflow Job</display-name>
        <servlet-class>org.apache.oozie.servlet.V1JobServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>v2job</servlet-name>
        <display-name>WS API for a specific Workflow Job</display-name>
        <servlet-class>org.apache.oozie.servlet.V2JobServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>sla-event</servlet-name>
        <display-name>WS API for specific SLA Events</display-name>
        <servlet-class>org.apache.oozie.servlet.SLAServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet>
        <servlet-name>v2sla</servlet-name>
        <display-name>WS API for specific SLA Events</display-name>
        <servlet-class>org.apache.oozie.servlet.V2SLAServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <!-- servlet-mapping -->
    <servlet-mapping>
        <servlet-name>versions</servlet-name>
        <url-pattern>/versions</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>v0admin</servlet-name>
        <url-pattern>/v0/admin/*</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>v1admin</servlet-name>
        <url-pattern>/v1/admin/*</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>v2admin</servlet-name>
        <url-pattern>/v2/admin/*</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>callback</servlet-name>
        <url-pattern>/callback/*</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>v0jobs</servlet-name>
        <url-pattern>/v0/jobs</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>v1jobs</servlet-name>
        <url-pattern>/v1/jobs</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>v1jobs</servlet-name>
        <url-pattern>/v2/jobs</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>v0job</servlet-name>
        <url-pattern>/v0/job/*</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>v1job</servlet-name>
        <url-pattern>/v1/job/*</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>v2job</servlet-name>
        <url-pattern>/v2/job/*</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>sla-event</servlet-name>
        <url-pattern>/v1/sla/*</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
        <servlet-name>v2sla</servlet-name>
        <url-pattern>/v2/sla/*</url-pattern>
    </servlet-mapping>

    <!-- welcome-file -->
    <welcome-file-list>
        <welcome-file>index.jsp</welcome-file>
    </welcome-file-list>

    <filter>
        <filter-name>hostnameFilter</filter-name>
        <filter-class>org.apache.oozie.servlet.HostnameFilter</filter-class>
    </filter>

    <filter>
        <filter-name>authenticationfilter</filter-name>
        <filter-class>org.apache.oozie.servlet.AuthFilter</filter-class>
    </filter>

    <filter-mapping>
        <filter-name>hostnameFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <filter-mapping>
        <filter-name>authenticationfilter</filter-name>
        <url-pattern>/versions/*</url-pattern>
    </filter-mapping>

    <filter-mapping>
        <filter-name>authenticationfilter</filter-name>
        <url-pattern>/v0/*</url-pattern>
    </filter-mapping>

    <filter-mapping>
        <filter-name>authenticationfilter</filter-name>
        <url-pattern>/v1/*</url-pattern>
    </filter-mapping>

    <filter-mapping>
        <filter-name>authenticationfilter</filter-name>
        <url-pattern>/v2/*</url-pattern>
    </filter-mapping>

    <filter-mapping>
        <filter-name>authenticationfilter</filter-name>
        <url-pattern>/index.jsp</url-pattern>
    </filter-mapping>

    <filter-mapping>
        <filter-name>authenticationfilter</filter-name>
        <url-pattern>/admin/*</url-pattern>
    </filter-mapping>

    <filter-mapping>
        <filter-name>authenticationfilter</filter-name>
        <url-pattern>*.js</url-pattern>
    </filter-mapping>

    <filter-mapping>
        <filter-name>authenticationfilter</filter-name>
        <url-pattern>/ext-2.2/*</url-pattern>
    </filter-mapping>

    <filter-mapping>
        <filter-name>authenticationfilter</filter-name>
        <url-pattern>/docs/*</url-pattern>
    </filter-mapping>

</web-app>

ServicesLoaader implements the ServletContextListener interface. During initialization, it basically creates and initializes the Services singleton instance which will then load all the different services used in Oozie. For example, ActionService, AuthorizationService, ConfigurationService, CoordinatorService, JPAService, and etc.

After the initialization, servlets in the Oozie will be able to use the above created services to fullfill the submitted requests. For example, JsonRestServlet, the base class for Oozie web service API utilizes the InstrumentationService and ProxyUserService.


package org.apache.oozie.servlet;

import org.apache.oozie.service.Services;

import javax.servlet.ServletContextListener;
import javax.servlet.ServletContextEvent;

/**
 * Webapp context listener that initializes Oozie {@link Services}.
 */
public class ServicesLoader implements ServletContextListener {
    private static Services services;
    private static boolean sslEnabled = false;

    /**
     * Initialize Oozie services.
     *
     * @param event context event.
     */
    public void contextInitialized(ServletContextEvent event) {
        try {
            String ssl = event.getServletContext().getInitParameter("ssl.enabled");
            if (ssl != null) {
                sslEnabled = true;
            }

            services = new Services();
            services.init();
        }
        catch (Throwable ex) {
            System.out.println();
            System.out.println("ERROR: Oozie could not be started");
            System.out.println();
            System.out.println("REASON: " + ex.toString());
            System.out.println();
            System.out.println("Stacktrace:");
            System.out.println("-----------------------------------------------------------------");
            ex.printStackTrace(System.out);
            System.out.println("-----------------------------------------------------------------");
            System.out.println();
            System.exit(1);
        }
    }

    /**
     * Destroy Oozie services.
     *
     * @param event context event.
     */
    public void contextDestroyed(ServletContextEvent event) {
        services.destroy();
    }

    public static boolean isSSLEnabled() {
        return sslEnabled;
    }
}

If you would like to take a look at Oozie’s services layer, check out my blog post https://weishungchung.com/2013/11/24/a-look-into-oozie-internals/

Randomness in Random Forest

In the last post, we looked at the bagging process where a random sample of input is used to build each individual tree. This is one of the two types of randomness used in the algorithm. The second kind of randomness is used in the node splitting process. During the process, a random set of m attributes out of all attributes is used to compute the best split based on information gain.

If m is not specified in the algorithm, then the following logic is used to determine m.

For regression tree,

m = (int) Math.ceil(total_number_attributes / 3.0);

For classification tree,

m = (int) Math.ceil(Math.sqrt(total_number_attributes));

If you take a look at the Apache Mahout DecisionTreeBuilder, you will find the randomAttributes method below. This method selects m attributes to consider for split.

org.apache.mahout.classifier.df.builder.DecisionTreeBuilder

/**
   * Randomly selects m attributes to consider for split, excludes IGNORED and LABEL attributes
   *
   * @param rng      random-numbers generator
   * @param selected attributes' state (selected or not)
   * @param m        number of attributes to choose
   * @return list of selected attributes' indices, or null if all attributes have already been selected
   */
  private static int[] randomAttributes(Random rng, boolean[] selected, int m) {
    int nbNonSelected = 0; // number of non selected attributes
    for (boolean sel : selected) {
      if (!sel) {
        nbNonSelected++;
      }
    }

    if (nbNonSelected == 0) {
      log.warn("All attributes are selected !");
      return NO_ATTRIBUTES;
    }

    int[] result;
    if (nbNonSelected <= m) {
      // return all non selected attributes
      result = new int[nbNonSelected];
      int index = 0;
      for (int attr = 0; attr < selected.length; attr++) {
        if (!selected[attr]) {
          result[index++] = attr;
        }
      }
    } else {
      result = new int[m];
      for (int index = 0; index < m; index++) {
        // randomly choose a "non selected" attribute
        int rind;
        do {
          rind = rng.nextInt(selected.length);
        } while (selected[rind]);

        result[index] = rind;
        selected[rind] = true; // temporarily set the chosen attribute to be selected
      }

      // the chosen attributes are not yet selected
      for (int attr : result) {
        selected[attr] = false;
      }
    }

    return result;
  }

Given the list of chosen attributes, it loops through each one and computes the information gains for different splits generated by using different attributes and try to find the best split.


 // find the best split
    Split best = null;
    for (int attr : attributes) {
      Split split = igSplit.computeSplit(data, attr);
      if (best == null || best.getIg() < split.getIg()) {
        best = split;
      }
    }

After getting the best split, if the information gain is less than the specified threshold, then we stop and create the leaf node with the label.
For regression, the label is the regression value which is calculated by taking the average of all the values of the sample data set. For classification, we use the majority label, the value that appears the most.

 // information gain is near to zero.
    if (best.getIg() < EPSILON) {
      double label;
      if (data.getDataset().isNumerical(data.getDataset().getLabelId())) {
        label = sum / data.size();
      } else {
        label = data.majorityLabel(rng);
      }
      log.debug("ig is near to zero Leaf({})", label);
      return new Leaf(label);
    }