Vipul Pathak

Sunday, August 16, 2015

Creating An ElasticSearch Index With Schema and Analyzers

.

Analysis and Analyzers

Elastic Search breaks (tokenizes) data in the document and build index of words (tokens). For each token, it points to the documents that matches the token. The words (tokens) are transformed a particular manner before being stored in the index. This process of breaking the document in a set of words and then transforming each word as per the specification, is known as Analysis. Analysis is done by configured piece of software processes commonly referred to as Analyzers. Analyzers come bundled with Elastic Search (like Standard Analyzer) but can also be configured to act in custom manner, i.e. Custom Analyzers.

We can instruct Elastic Search to apply analysis on input documents in the way we want it. This process is called defining custom analyzers and we can do this by updating an existing index as well as while creating new index.

Defining Custom Analyzer During Index Creation

Below is the structure of request that will create an index, map individual field with proper data type and define a custom analyzer –

PUT /members_idx
{
    "settings": {
        "index": { },
        "analysis": {
            "char_filter": { },
            "tokenizer":   { },
            "filter":      { },
            "analyzer":    { }
        }
    },
    "mappings": {
        "members": { }
    }
}

The request above defines an index and associate analyzer(s) with it. The settings section defines the characteristics of the index and analyzers while the mappings section defines the schema of "type" and link individual fields with analyzers.

The index section could be some thing like below –

    "index": {
         "number_of_shards": 2,
         "number_of_replicas": 1,
         "refresh_interval": "2s"
     }

In the index sub-section within settings, we can specify many different settings, though specifying shards and replicas are most common and significant.

The settings top level section contains the analysis sub-section, which groups all configuration needed for defining analyzers. There are 4 types of configurations that are grouped under analysis- character filters, tokenizers, filters and analyzers.

Character filters are invoked first of all when the analysis begins. If defined, we can define 1 or more character filters in the char_filter section. Each character filter’s properties must be defined under it’s name, e.g. in the below example, I have define a character filter "symbol_to_word" which acts on the input string and basically replace special characters like- "!", "<" etc. to their word equivalents, like- "not", "less than" etc.

    "char_filter": {
         "symbol_to_word": {
             "type": "mapping",
             "mappings": [
                 "/=>or",
                 "!=>not",
                 "<=>less than",
                 ">=>greater than",
                 "*=>x",
                 "%=>percent"
             ]
         }
     }

Character Filters are optional to define and executes on raw input string before tokenization happens. Tokenizers, clearly are not optional and we must define exactly 1 tokenizer to tell elastic search, how to break the input string into terms. We can use built-in standard tokenizers, but at the same time we can define a new tokenizer as per our needs –

    "tokenizer": {
         "semicolon_tokenizer": {
             "type": "pattern",
             "pattern": ";+"
         } }

In this example, I have defined a tokenizer that will break the input string by semicolon ";". The pattern is actually a regular expression that says- look for 1 or more or semicolons in the input string and break them into terms. If no semicolon is found in the input string, then the whole string is treated as one term. Therefore, a string like "Michael Jackson; Indiana Jones; Jeff Richter; Vipul Pathak", will be broken by semicolon and each Name will become a term.

The filter block within the analysis block, defines the third building block of the analyzer. There can be 1 or more filters defined, if we include this section. Having said that, defining a filter explicitly itself is optional. The filter is applied in the chain after tokenization is complete. The filter(s) are called for each tokenized word/string, exactly in the order they are included in the analyzer. The goal of a filter is to transform or remove the term, before it can be included in the index. There are various types of filters that can be defined in a single filter block –

    "filter": {
        "the_sgn_filter": {
            "type": "stop",
            "stopwords": [ "CANTT", "MAKKE", "ITT" ]
        },
        "eng_hindi_syn_filter": {
            "type": "synonym",
            "ignore_case": true,
            "synonyms_path": "eng_hindi_synonyms.txt"
        }, "engram_filter": { "type": "edgeNGram", "min_gram": 2, "max_gram": 17 }
    }

The filter of type "stop" is used as a Stop Filter and it matches the list of word it has with the terms. If a match is found, the term is dropped and not included in the index. An "edgeNGram" filter generates an edge ngram of terms, i.e. nGrams that are between 2 and 17 characters of length and their one end are stick to the edge of the term, e.g. "typeahead" with generate nGrams like these- "ty", "typ", "type", "typea", "typeah", "typeahe", "typeahea" and "typeahead" (all starting from t) but not like- "ype" or "ahead".

A synonym type filter will generate synonyms of the terms used and at search time will match the original or the synonym identically. In this way, the chance of matching increases. The list of synonyms can be supplied inline as well like stop words, if the list is small (or) a separate synonym file can be supplied that list all the synonyms in the format specified below –

    sgn => shri ganeshay namah
    aum => om
    swagat => welcome
    namaskar => greetings
    ghadi => watch
    sanganak => computer
    guru => teacher

In the above synonym list, I am trying to match hindi spoken words to their english equivalents.  Smile

Finally, all the three building blocks of the analyzer needs to be included in the analyzer. The analyzer block can contain one or more custom analyzer(s). The custom analyzer basically group the use of character filers, tokenizer and term filters. Since, the supplied character filters and term filters can be more than one, they are accepted as an array, i.e. the "char_filter" field in side the custom analyzer can be supplied in square brackets, "[" and "]".

    "analyzer": {
        "description_analyzer": {
            "type": "custom",
            "char_filter": [ "html_strip", "symbol_to_word" ],
            "tokenizer": "standard",
            "filter": [ "uppercase", "the_sgn_filter", "eng_hindi_syn_filter" ]
        },
        "name_analyzer": {
            "type": "custom",
            "char_filter": [ "symbol_to_word" ],
            "tokenizer": "standard",
            "filter": [ "uppercase", "engram_filter" ]
        },
        "email_analyzer": {
            "type": "custom",
            "char_filter": [ "html_strip" ],
            "tokenizer": "semicolon_tokenizer",
            "filter": [ "lowercase", "trim" ]
        }
    }

Here, I am trying to define 3 custom analyzers. Every custom analyzer is defined with the type "custom". Then, we specify which char_filter(s) to use, which tokenizer to use and what all term filter(s) to use. I have defined one analyzer to apply on the description field, called description_analyzer. I used 2 character filters, one that removes html markup entities (like &amp; and &lt;) and the other one converts certain characters to specific english words, like- ! is converted to not and < is converted to less than. It uses standard as tokenizer which break the word by whitespace or punctuation, but we can use an custom tokenizer as well, like we used a custom tokenizer (semicolon_tokenizer) in the email_analyzer. The semicolon_analyzer tokenize the string by ; character instead of by whitespace. The term filters like lowercase, uppercase etc. converts the case of the terms.

Lets define schema for a type and apply the analyzers we have defined so far. The index I have create was named as members_idx and the type I am going to create is members. In RDBMS parlance, we can think that an Index is a kind of database and a type is a kind of table. We define a schema by doing an HTTP post on the Index URI. This request below, puts everything we discussed so far together, including the settings part and the mappings part –

POST /members_idx
{
    "settings": {
        "index": {
            "number_of_shards": 2,
            "number_of_replicas": 1,
            "refresh_interval": "2s"
        },
        "analysis": {
            "char_filter": {
                "symbol_to_word": {
                    "type": "mapping",
                    "mappings": [
                        "/=> or ",
                        "!=> not ",
                        "<=> less than ",
                        ">=> greater than ",
                        "*=> x ",
                        "%=> percent"
                    ]
                }
            },
            "tokenizer": {
                "semicolon_tokenizer": {
                    "type": "pattern",
                    "pattern": ";+"
                }
            },
            "filter": {
                "the_sgn_filter": {
                    "type": "stop",
                    "stopwords": [ "CANTT", "MAKKE", "ITT" ]
                },
                "eng_hindi_syn_filter": {
                    "type": "synonym",
                    "ignore_case": true,
                    "synonyms_path": "eng_hindi_synonyms.txt"
                },
                "engram_filter": {
                    "type": "edgeNGram",
                    "min_gram": 2,
                    "max_gram": 18
                }
            },
            "analyzer": {
                "description_analyzer": {
                    "type": "custom",
                    "char_filter": [ "html_strip", "symbol_to_word" ],
                    "tokenizer": "standard",
                    "filter": [ "uppercase", "the_sgn_filter", "eng_hindi_syn_filter" ]
                },
                "name_analyzer": {
                    "type": "custom",
                    "char_filter": [ "symbol_to_word" ],
                    "tokenizer": "standard",
                    "filter": [ "uppercase", "engram_filter" ]
                },
                "email_analyzer": {
                    "type": "custom",
                    "char_filter": [ "html_strip" ],
                    "tokenizer": "semicolon_tokenizer",
                    "filter": [ "lowercase", "trim" ]
                }
            }
        }
    },
    "mappings": {
        "members": {
            "_all": {
                "enabled": true
            },
            "_timestamp": {
                "enabled": true,
                "path": "date_of_birth",
                "format": "dd-MM-YYYY",
                "default": "31-03-2001",
                "ignore_missing": true
            },
            "_ttl": {
                "enabled": true,
                "default": "3m"
            },
            "properties": {
                "name": {
                    "type": "string",
                    "store": true,
                    "index": "analyzed",
                    "null_value": "(null)",
                    "index_analyzer": "name_analyzer",
                    "search_analyzer": "name_analyzer",
                    "fields": {
                        "raw": {
                            "type": "string",
                            "index": "not_analyzed"
                        }
                    }
                },
                "age": {
                    "type": "integer"
                },
                "gender": {
                    "type": "string",
                    "store": true,
                    "index": "no",
                    "include_in_all": false
                },
                "date_of_birth": {
                    "type": "date",
                    "format": "dd-MM-YYYY"
                },
                "address": {
                    "type": "string"
                },
                "city": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "zip": {
                    "type": "long"
                },
                "state": {
                    "type": "string",
                    "copy_to": "my_meta"
                },
                "country": {
                    "type": "string",
                    "copy_to": "my_meta"
                },
                "phone": {
                    "type": "string",
                    "store": true,
                    "index": "not_analyzed"
                },
                "email_address": {
                    "type": "string",
                    "analyzer": "email_analyzer"
                },
                "profession": {
                    "type": "string",
                    "analyzer": "description_analyzer"
                },
                "description": {
                    "type": "string",
                    "analyzer": "description_analyzer"
                },
                "my_meta": {
                    "type": "string",
                    "store": "yes"
                }
            }
        }
    }
}

The mappings block above, defines the schema of the members type. The mappings block define each table along with its data type and analyzer, for example- looking at the description field above, it’s data type is set to string and the analyzer we want to execute on this field is description_analyzer.

Any field within the mapping section, whose index setting is set to not_analyzed is stored in the index but no analysis is done. If the index setting is set to no, then it is not even stored. The only other possible value for index setting is analyzed which enable storage as well as analysis. The phone field above is not_analyzed in our case.

We can specify a copy_to setting in any field. The idea of copy to is to copy the content of that field into a destination field of same data type. In our example above, the fields state and country are copied to a new field called my_meta. We can use the my_meta field to query as well.

We defined a field email_address of data type string. The type of data that this field may have is like this- "m_jackson@example.com; i_jones@anotherexample.com; j_richter@example.com; v_pathak@yetanotherexample.com". We have applied the analyzer email_analyzer on this field. The expected way for this analyzer to work on this email data is to tokenize the string by ";" and then convert the terms into lowercase and trim spaces around each term. The tokenized email addresses are then stored in the index.

There are special meta fields that elastic search offers and that starts with an underscore character _. I have enabled some of these special fields, like- _all, _timestamp and _ttl. The field _all stores content from all the fields and it is searched when the query doesn’t specify which field to search. The _timestamp field is the special field as well and it is populated by elastic search with the value of current time when a document is indexed. We can override this behavior by specifying the field from which the value should be picked up instead of current time. The _ttl define the maximum age of a document, which is useful in expiring the document automatically based on time.

Finally, elastic search provides us the capability of define different analyzers for index time and for query time. This is what we used in the definition of our name field. We can ask elastic search to execute one analyzer when document is indexed and another analyzer when a document is queried. The settings index_analyzer and search_analyzer are used to specify these analyzers, though in our example- I specified the same analyzer in both of these settings. For the name field, the name_analyzer uses edgeNgram filter which allows type ahead kind of functionality. For a name Indiana, the edgeNgram filter will generate terms of increasing length that starts with the first letter of the word, i.e. "In", "Ind", "Indi", "India", "Indian" and "Indiana". Therefore, if "Ind" was specified during a type ahead search, it will match with "Indiana".

.

Sunday, August 02, 2015

EMP and DEPT Tables For Use With Hive and Pig

.

EMP and DEPT tables are pretty popular between Oracle users. These tables were very handy in quickly trying new queries. Also, there exists a DUAL table in Oracle that was pretty useful in evaluate expressions, like- “Select (SYSDATE + 1/24) as OneHourFromNow  FROM DUAL“. These tables doesn’t exists in Hive, but we can create them on our own.

I have created these  tables for trying the Hive queries and even for Pig scripts. I created the following two data files –

EMP

7369,SMITH,CLERK,7902,17-DEC-1980,800,\000,20
7499,ALLEN,SALESMAN,7698,20-FEB-1981,1600,300,30
7521,WARD,SALESMAN,7698,22-FEB-1981,1250,500,30
7566,JONES,MANAGER,7839,2-APR-1981,2975,\000,20
7654,MARTIN,SALESMAN,7698,28-SEP-1981,1250,1400,30
7698,BLAKE,MANAGER,7839,1-MAY-1981,2850,\000,30
7782,CLARK,MANAGER,7839,9-JUN-1981,2450,\000,10
7788,SCOTT,ANALYST,7566,09-DEC-1982,3000,\000,20
7839,KING,PRESIDENT,\000,17-NOV-1981,5000,\000,10
7844,TURNER,SALESMAN,7698,8-SEP-1981,1500,0,30
7876,ADAMS,CLERK,7788,12-JAN-1983,1100,\000,20
7900,JAMES,CLERK,7698,3-DEC-1981,950,\000,30
7902,FORD,ANALYST,7566,3-DEC-1981,3000,\000,20
7934,MILLER,CLERK,7782,23-JAN-1982,1300,\000,10

DEPT

10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

DUAL


I created a folder in HDFS and then I used HDFS’ copyFromLocal command to upload these text files to HDFS. Next, I created the following Hive table definitions –

EMP Table Creation

CREATE EXTERNAL TABLE IF NOT EXISTS emp (
    emp_no int,
    ename string,
    job string,
    mgr_id int,
    date_of_joining string,
    salary int,
    bonus int,
    dept_no int)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
    NULL DEFINED AS '00'
STORED AS TEXTFILE
LOCATION 'hdfs://localhost:9000/user/vpathak/Data/emp'
;

DEPT Table Creation

CREATE EXTERNAL TABLE IF NOT EXISTS dept (
    dept_no  int,
    d_name   string,
    l_loc    string)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 'hdfs://localhost:9000/user/vpathak/Data/dept'
;

DUAL Table Creation

CREATE TABLE IF NOT EXISTS dual ( name  string )
ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 'hdfs://localhost:9000/user/vpathak/Data/dual' ;

Usage

Now since the tables in Hive are created, I can use them in a pretty handy way for quick tryouts –

hive> SELECT format_number(100000/6, 2) AS SixParts FROM dual          -- Returns 16,666.67  ;
hive> SELECT pow(2, 8) AS ByteCapacity FROM dual                       -- Returns 256  ;
hive> SELECT from_unixtime(unix_timestamp()) AS CurrentTime FROM dual  -- Returns the current time ;
hive> SELECT 'Vipul Pathak' AS HiveUser FROM dual                      -- Returns a static string ;
hive>
hive> SELECT e.emp_no, e.ename, e.bonus, d.d_name
        FROM emp AS e JOIN dept AS d
          ON (e.dept_no = d.dept_no) ;

Finally for pig scripts, I created the following pig relations and placed them in the ~/.pigbootup file :

emp = LOAD 'hdfs://localhost:9000/user/vpathak/Data/emp'
      USING PigStorage(',')
      AS (emp_no: INT, ename: CHARARRAY, job: CHARARRAY,
          mgr_id: INT, doj: CHARARRAY, salary: FLOAT,
          bonus: FLOAT, dept_no: INT);

dept = LOAD 'hdfs://localhost:9000/user/vpathak/Data/dept'
       USING PigStorage(',')
       AS (dept_no: INT, d_name: CHARARRAY, d_loc: CHARARRAY) ;

These tables are pretty useful for trying out different kind of queries (like back in Oracle days).   :-)

..

Friday, July 24, 2015

Cloudera Certified Developer for Apache Hadoop (CCDH)

.

Happy to share the earning of CCDH certification  :-)

Cloudera Certified Developer For Apache Hadoop

Cloudera Certified Developer For Apache Hadoop

Verification URL:  http://certification.cloudera.com/verify   (with License # 100-013-285)

.

Loads of conceptual as well as programming questions that include multiple choice questions as well.

Reading Hadoop: The Definitive Guide and Programming Hive a couple of times and practicing Map-Reduce programming model rigorously, was instrumental in clearing the exams. Setting up my own Hadoop cluster on Amazon EC2 was a great learning experience.

It is lots of fun in Big Data  :-)

.

Saturday, July 18, 2015

Writing a Generic UDF in Hive

.

There are few type of UDFs that we can write in Hive.

  • Functions that act on each column value passed to it, e.g. Select Length(name) From Customer
    • Specific functions written for a specific data type (simple UDFs)
    • Generic functions written to working with more than one data type
  • Functions that act on a group of values and emit a single result, e.g. Select AVG(salary) From Employees
  • Functions that act on a combined values and explode it generate multiple rows

In an earlier article, we discussed simple UDFs. Here we will focus on the generic functions that act on each column value.

We will write here a generic UDF that will double the value of numeric columns or double the length of string columns by concatenating string value by itself –

GenericDouble

package com.hive.in.action.assignments;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.lazy.LazyDouble;
import org.apache.hadoop.hive.serde2.lazy.LazyInteger;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

@Description(
    name     = "GenericDouble",
    value    = "_FUNC_( value) : Double the value of numeric argument, " +
               "Concatinate value to itself for string arguments.",
    extended = "Example:\n" +
               "    SELECT _FUNC_(salary) FROM customers;\n" +
               "    (returns 12,000 if the salary was 6,000)\n\n" +
               "    SELECT _FUNC_(name) FROM customers;\n" +
               "    (returns \"Tim MayTim May\" if the name was \"Tim May\")\n"
)
/**
 * This class is a Generic User Defined Function meaning that we can call this
 * function with more than one type of arguments, i.e. int, long, float, double
 * and String. The function returns the same type of output as it gets the input.
 *
 * @author vpathak
 */
public final class GenericDouble extends GenericUDF {

    private ObjectInspector[] _inputObjectInspector = null;
    private GenericUDFUtils.ReturnObjectInspectorResolver
                           _returnObjectInspectorResolver = null;


    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {

        Object returnable = null;
        Object preparedOutput = null;
        String argumentType = arguments[0].get().getClass().getSimpleName();

        // System.out.println("Arguments[0]: Type: " + argumentType);

        // Check the type of argument ...
        if (argumentType.equalsIgnoreCase("LazyInteger"))    // select UDF(emp_no) from EMP
        {
            // The input parameter is an IntWritable ...
            LazyInteger lazyOut = new LazyInteger((LazyInteger) arguments[0].get());

            IntWritable underlyingInt = lazyOut.getWritableObject();
            underlyingInt.set( underlyingInt.get() * 2 );

            preparedOutput = lazyOut;
        }
        else if (argumentType.equalsIgnoreCase("IntWritable"))  // select UDF(9) from DUAL
        {
            // The input parameter is an IntWritable ...
            IntWritable inputParameter = (IntWritable) arguments[0].get();

            // Hive runs MR jobs in the background and Mappers/Reducers keep using
            // the same object as parameter, only the value is set() for iteration,
            // the Writable object remains the same. Therefore, we should be returning
            // a new object, instead of making changes in input Object's value.
            preparedOutput = new IntWritable( inputParameter.get() * 2 );

        }
        else if (argumentType.equalsIgnoreCase("LazyDouble"))    // select UDF(bonus) from EMP
        {
            // The input parameter is an IntWritable ...
            LazyDouble lazyOut = new LazyDouble((LazyDouble) arguments[0].get());

            DoubleWritable underlyingDouble = lazyOut.getWritableObject();
            underlyingDouble.set( underlyingDouble.get() * 2 );

            preparedOutput = lazyOut;
        }
        else if (argumentType.equalsIgnoreCase("DoubleWritable")) // select UDF(2.23) from dual;
        {
            // The input parameter is an DoubleWritable ...
            final DoubleWritable inputParameter = (DoubleWritable) arguments[0].get();

            // We should be returning a new object, instead of mutating the input.
            preparedOutput = new DoubleWritable( inputParameter.get() * 2 );
        }
        else if (argumentType.equalsIgnoreCase("LazyString"))    // select UDF(Job) from EMP
        {
            // The input parameter is a Wrapped Text ...
            LazyString lazyOut = new LazyString((LazyString) arguments[0].get());

            Text underlyingText = lazyOut.getWritableObject();
            underlyingText.set( underlyingText.toString() + underlyingText.toString() );

            preparedOutput = lazyOut;
        }
        else if (argumentType.equalsIgnoreCase("Text"))  // select UDF('Clerk') from dual
        {
            // The input parameter is an Text ...
            final Text inputParameter = (Text) arguments[0].get();

            // We should be returning a new object, instead of mutating the input.
            preparedOutput = new Text( inputParameter.toString() + inputParameter.toString() );
        }


        // Check input type (inputObjectInspector) and set the appropriate
        // output data type (outputValue) ...
        returnable = _returnObjectInspectorResolver.convertIfNecessary(preparedOutput, _inputObjectInspector[0]);

        return returnable;
    }


    @Override
    /**
     * This method is called within a Hive session (e.g.  hive> _ ) when the UDF is
     * called and an error occurs. The method gets a chance to put as much information
     * possible to show to the user. A standard string "HiveException: Error evaluating"
     * is prepended in front of whatever is returned by this function-
     *
     * e.g.  "HiveException: Error evaluating value in column emp_no (type: int)"
     */
    public String getDisplayString(String[] errorInfo)
    {
        return "value in column " + errorInfo[0] + " (type: " + _inputObjectInspector[0].getTypeName() + ").";
    }


    @Override
    /**
     * Called for each value of each column.
     */
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

        // Save the input Object Inspectors ...
        _inputObjectInspector = arguments;

        // Validate: Argument Count ...
        if (arguments.length <= 0 || arguments[0] == null) {
            throw new UDFArgumentException("No argument was detected.");
        }

        // Create the instance of the most important object within this class ...
        _returnObjectInspectorResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);

        // Validate: Argument type checking ...
        if (_returnObjectInspectorResolver.update(arguments[0]) == false) {
            throw new UDFArgumentTypeException(2, "Datatype problem with the passed argument.");
        }

        return _returnObjectInspectorResolver.get();
    }
}

You can pass multiple arguments to the UDF. Whatever arguments you pass to the UDF, they are not presented in the evaluate() method as is. Rather, you will get an array of ObjectInspector objects, one ObjectInspector per argument. So arguments[0] represents an Inspector for the first argument you passed to the UDF, arguments[1] represents the Inspector for the 2nd argument and so on. ObjectInspector are helpful in look into the internal structure of an object.

getDisplayString()

The getDisplayString method is really helpful to the developer, since it can return meaningful troubleshooting information. Instead of returning general error message, Hive calls this method whenever there is an error executing the UDF. The UDF developer can really compile useful information, that can be instrumental in troubleshooting the runtime error/exception. When a problem is detected while executing the UDF, hive throws a HiveException but append information returned by GetDisplayString method to the exception thrown by it. In the above example, this method returns the name and type of the column that caused the problem.

Why Hive Provides The ObjectInspectors

As the name suggests, an ObjectInspector helps us to inspect the argument we are going to receive in the UDF. Since, Hive has a variety of data types and it can go to a very complex level of custom data type definition, Hive UDFs can be passed very basic data types (Primitive like long, double, boolean) as well as very complex data types (like an Array of Map of String key and Struct Value, where the Struct contains Name, Age, Salary and Location, i.e. ARRAY<MAP<STRING, STRUCT<STRING, FLOAT, INT, STRING>>> ). Since, UDFs can be called on tables within the query, it is possible that columns with really complex data types can be passed to UDFs.

It is because of this possible complexity of data types, that can be passed to a generic UDF (which is flexible to type until runtime), Hive passes an ObjectInspector instead of the object itself, since now the UDF code must understand the structure of the object and then process it. Similarly, the processed out can be equally complex. Therefore, an ObjectInspector for the output value is required that Hive will use when you return back the processed output.

ObjectInspectors are of great use within a generic UDF and we access the values of the parameters passes using them. There are ObjectInspectors for typically all types and they are categorized among PrimitiveObjectInspector, ListObjectInspector, MapObjectInspector and StructObjectInspector.

All the specialized ObjectInspectors are derived from these four, e.g. LazyDoubleObjectInspector that helps us in dealing with a DoubleWritable data type, is actually extended from a class that implements PrimitiveObjectInspector. An ObjectInspector of a complex object can return ObjectInspectors of underlying objects, e.g. myArrayObjInsp.getListElementObjectInspector() returns an inspector that can be type casted to a StandardMapObjectInspector, if the Array contains Map objects in the Input to the UDF.

initialize()

When a UDF is used in a query, Hive loads the UDF in memory. The initialize() is called for the first time, when the UDF is invoked. The purpose of call to this method, is to check the type of arguments that will be passed to the UDF. For each value that will be passed to the UDF, the evaluate() method will be called. So if there are 10 rows for which the UDF is going to be called, evaluate() will be called 10 times. However, Hive first call the initialize() method of the Generic UDF before any call to evaluate(). The goals for initialize() are to

  • validate the input arguments and complain if input is not as per expectation
  • save the Object Inspectors of input arguments for later use during evaluate()
  • provide an Object Inspector to Hive for the return type

You can do various ways to validate the input, like checking the arguments array for size, category on input type (remember PrimitiveObjectInspector, MapObjectInspector etc. ?), checking the size of underlying objects (in case of a Map or Struct etc.). Validation can go up to any extent that you choose, including traversing the entire object hierarchy and validating every object. When the validation fails, we can throw a UDFArgumentException or one of its subtypes to indicate error.

The Object Inspector for the return type, should be constructed within the initialize() method and returned. We can use the factory methods of ObjectInspectorFactory class. For example, if the UDF is going to return a MAP type, then we can use the getStandardMapObjectInspector() method which accept information about how the Map will be constructed (e.g. Key type of the Map and the Value type of the Map).

The saved Object inspectors are instrumental when we try to obtain the input value in the evaluate() method.

evaluate()

SELECT GenericDouble(bonus) FROM emp;

Suppose the temp table has 10 rows in it. The the evaluate() method will be called 10 times for each column value in 10 rows. All the values passed to evaluate() however are serialized bytes. Hive delay the instantiation of objects until a request for the object is made, hence the name DeferredObject. Based on what type of value was passed to the UDF, the DeferredObject could represent lazily initialized objects. In the above example, it could be an instance of LazyDouble class. When the value is requested, like LazyDouble.getWritableObject() then the bytes are deserialized into an object and returned.

However, if the same GenericUDF is called with a value provided at command line (instead of as a result of IO), it could be a DoubleWritable object in the first place and doesn’t need a deserialization. Based on the type of object we get in the Input, we need to use its data accordingly and process it.

Finally, based on the type of input we received, we want to return the same type of Output, since we just doubled the input and returned. The convertIfNecessary() method helps us in this and turn the output type the same as the Input type based on the Object Inspector we pass to it.
.

Thursday, June 25, 2015

Loading Data in Pig Using HCatalog

.

HCatalog is an extension of Hive and in a nutshell, it exposes the schema information in Hive Metastore such that applications outside of Hive can use it.

The objective of HCatalog is to hold the following type of information about the data in HDFS –

  • Location of the data
  • Metadata about the data (e.g. Schema etc)

This benefits the scripts and MR jobs who acts on the data, to not to worry about the location of files or schema of the data. HCatalog expose open APIs that other tools (like Teradata Aster) can use to get benefit from it.

In Pig, we can use HCatalog (like the following) to load data without specifying its location –

$ pig -useHCatalog
.  .  .
grunt> exch_log = LOAD 'Exchange_Log' USING org.apache.hive.hcatalog.pig.HCatLoader();

and to store data in the already created Hive table, we can use the command similar to the following-

grunt> STORE exch_log INTO 'Exchange_Log' USING org.apache.hive.hcatalog.pig.HCatStorer();

The table schema should be defined in Hive before you try to store the value and the relation being stored should have only fields that are present in the Hive table.

Note the presence of “hive” in the package name before hcatalog, which documents the fact that HCatalog is part of a bigger project called Hive :-).  HCatalog’s home is changed from org.apache.hcatalog (earlier releases) to org.apache.hive.hcatalog (recent releases).

.

Saturday, June 13, 2015

Writing Simple UDF in Hive

.

There are a few type of UDFs that we can write in Hive.

  • Functions that act on each column value passed to it, e.g. Select Length(name) From Customer
    • Specific functions written for a specific data type
    • Generic functions written to working with more than one data type (GenericUDF)
  • Functions that act on a group of values and emit a single result, e.g. Select AVG(salary) From Employees
  • Functions that act on a combined values and explode it generate multiple rows

Here we will focus on the specific functions that act on each column value (shown above in bold). There is another article that discusses Generic UDFs.

We will write here a UDF that will set the width of the column and output by either padding spaces or by trimming the output to a specified size. As a first step, we will add dependencies in the pom.xml for Hive and Hadoop –

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>${hadoopVersion}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>${hiveVersion}</version>
    </dependency>

Here you can define the versions of Hive and Hadoop as per your wish. I defined these properties as follows –

  <properties>
    <hadoopVersion>1.2.1</hadoopVersion>
    <hiveVersion>0.14.0</hiveVersion>
  </properties>

Next, I ran the following command to download Hive, Hadoop and other dependencies before I create the eclipse project and write the code –

$ mvn  clean  compile

Once the dependencies are downloaded, I created a Java project and wrote the following Java class for the Hive UDF –

SetSize


package com.hive.in.action.assignments;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.io.Text;

@Description(
        name     = "SetSizeStr",
        value    = "_FUNC_(String columnNameToPad, int desiredSize) : Pad or Cut the string to make it the desired size.",
        extended = "Example:\n" +
                   "    SELECT _FUNC_(name, 35) FROM customers;\n"
)
/**
 * This UDF acts on values of a single String column and and return one output value
 * per input column value it receives.
 *
 * @author vpathak
 */
public final class SetSize extends UDF {

    // This method is called by Hive for each column value.
    public Text evaluate(final Text inText, int nSize) {

        // Validate input ...
        if (inText == null) {
            return null;
        }

        // Extract the string from the Text object and calculate its length ...
        String sReturnText = inText.toString();
        int nLen = sReturnText.trim().length();

        // Cut or Pad the string ...
        if (nLen > nSize) {
            sReturnText = sReturnText.substring(0, nSize);
        } else {
            StringBuffer strBuff = new StringBuffer(sReturnText.trim());
            for (int nCtr = 0; nCtr < (nSize-nLen); ++nCtr) {
                strBuff.append(' ');
            }
            sReturnText = strBuff.toString();
        }

        // Return the new string ...
        return new Text(sReturnText);
    }
}

The User Defined Function that we want to write to extend the functionality of Hive should be a class that extends from Hive’s UDF class. Hive executes the evaluate() function of the class based on the matching data types. This means we can override evaluate() and implement multiple variants of it. For example, if evaluate is implemented with (Text, int) and (int, int) and at runtime, we call SetSize(“Sample”, 10), then Hive will call the (Text, int) version of evaluate(). However, an evaluate method should return a value and the return type can’t be void.

I built the Jar of the Java class by using the following Maven command –

$ mvn  clean  install

Upon successful completion of the command, the target sub-directory contains the SetSize-1.jar file.

To use the UDF in Hive, we must first Add the Jar file and then create a function definition in the Hive like this –

$ hive
.  .  .
hive> ADD JAR file:///Users/vpathak/Data/Technical/Hive/UDFs/SetSize/target/SetSize-1.jar;
hive> CREATE TEMPORARY FUNCTION SetSize AS 'com.hive.in.action.assignments.SetSize';

Like we can see here that the Java code we wrote here is associated with a temporary function name SetSize in Hive. The Description annotation that we decorated our class with, contains the usage information about our UDF. But the usage information doesn’t contain the name of our UDF. Instead it says- _FUNC_. However, once we create a temporary function, Hive will use that name (SetSize) instead of _FUNC_. This allows the same implementation to be used by different function name, since the user create a function name for their choice to use the functionality.

So if we use the DESCRIBE FUNCTION  or DESCRIBE FUNCTION EXTENDED  command, Hive will display the usage information but will use the SetSize function name instead of _FUNC_, like this –

hive> DESCRIBE FUNCTION EXTENDED SetSize;
OK

SetSize(String columnNameToPad, int desiredSize) : Pad or Cut the string to make it the desired size.
Synonyms: setsize
Example:
    SELECT SetSize(name, 35) FROM customers;

Time taken: 0.031 seconds, Fetched: 5 row(s)
hive>

Generic UDFs are discussed here.

.

Monday, June 08, 2015

Loading an HDFS Data File in Pig

Filed under: Big Data,Hadoop,Pig,Programming,Technical — Vipul Pathak @ 21:23
Tags: , , , , ,

.

emp = LOAD ‘/path/to/data/file/on/hdfc/Employees.txt’ [ USING PigStorage(‘ ‘) ] AS (
    emp_id: INT,
    name: CHARARRAY,
    joining_date: DATETIME,
    department: INT,
    salary: FLOAT,
    mgr_id: INT,
    residence: BAG {
            b:(addr1: CHARARRAY, addr2: CHARARRAY, city: CHARARRAY)
}) ;

The Alias for data in file “Employees.txt” is emp and using emp, we can refer to individual fields and perform actions, like filtering, grouping etc. Note that the text file don’t have named columns (it is structured though). We have defined a schema on top of file using the LOAD command. Every field name and relation name must start with an Alphabet and can only contain A-Z, a-z, 0-9 and an underscore character. Field and Relation names are not case sensitive. In Pig 0.14.0, even the keywords like- LOAD and DUMP are case insensitive.

The parameter to LOAD is actually a resource locator, i.e. it can be an HDFS file location, an HBase table, a JDBC connection or even a WebService URL. A load/store function extend Pig’s LoadFunc class and encapsulate Hadoop’s InputFormat. It provides hint to the LOAD operation about the data and it’s storage format. TextLoader, PigStorage and HCatLoader are such functions.

If we LOAD the data in a relation using TextLoader(), then there is no structure defined and the data is loaded as simple text (unstructured). On the other hand, if we use the PigStorage( ), we can load the data in a structured form and can even define the schema using the AS clause.

.

Saturday, June 06, 2015

Writing EvalFunc UDF in Pig

.

UDFs (User Defined Functions) are ways in pig to extend its functionality. There are two type of UDFs that we can write in pig –

  • Evaluate (extends from EvalFunc base class)
  • Load/Store functions (extends from LoadFunc base class)

Here we will stepwise develop an Evaluate UDF. Lets start by conceptualizing a UDF (named VowelCount) that will return an integer representing a count of vowels in a string.


Including Pig Jar files for Pig 0.14.0  in pom.xml

<properties>
    <hadoopVersion>1.2.1</hadoopVersion>
    <pigVersion>0.14.0</pigVersion>
</properties>

<dependencies>

    <dependency>
        <groupId>org.apache.pig</groupId>
        <artifactId>pig</artifactId>
        <version>${pigVersion}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>${hadoopVersion}</version>
    </dependency>
    
.  .  .
.  .  .

Implementing UDF

package com.pig.ni.action.assignments.udf;

import org.apache.pig.EvalFunc;
import org.apache.pig.PigWarning;
import org.apache.pig.data.Tuple;

import java.io.IOException;


public class VowelCount extends EvalFunc {

    public Integer exec(Tuple inputVal) throws IOException {

        // We will check the input string for these characters ...
        String[] setOfVowels = new String[]{"a", "e", "i", "o", "u"};

        // Validate Input Value ...
        if (inputVal == null ||
            inputVal.size() == 0 ||
            inputVal.get(0) == null) {

            // Emit warning text for user, and skip this iteration
            // Fail this iteration but don't fail the entire task.
            // Throwing an exception will fail the entire task, while
            // returning NULL will only fail this UDF call ...
            super.warn("Inappropriate parameter, either value missing or null. " +
                       "Skipping ...",
                       PigWarning.SKIP_UDF_CALL_FOR_NULL);
            return null;
        }

        // Count vowels in this string ...
        final String inputString = ((String) inputVal.get(0)).toLowerCase();
        int vowelCount = 0;
        int recentlyReportedIndex = 0;

        try {

            // Check each vowel ...
            for(String thisVowel : setOfVowels) { // Find "a" in "Vipul Pathak"

                recentlyReportedIndex = 0;

                // Keep counting until -1 is returned by indexOf() method ...
                while (true){

                    // Where is next vowel located ?
                    recentlyReportedIndex = inputString.indexOf(thisVowel, recentlyReportedIndex);

                    // Not found (-1 is failure to find) ..... Break the loop
                    if (recentlyReportedIndex < 0) {
                        break;

                    } else {
                        // Vowel found
                        ++vowelCount;

                        // Next search for this vowel should be after this index.
                        ++recentlyReportedIndex;
                    }
                }

            }
        } catch (Exception e) {
            throw new IOException("VowelCount: Fatal error condition. Aborting execution: ", e);
        }

        return vowelCount;

    }

}

Using the UDF

$ pig
grunt> REGISTER /Users/vpathak/Data/VowelCount/target/VowelCount-1.jar;
grunt> DEFINE VC com.pig.ni.action.assignments.udf.VowelCount;
.  .  .
grunt> emp = LOAD 'hdfs://localhost:9000/user/vpathak/Data/emp/emp.txt' USING PigStorage(',') AS (emp_no: INT, ename: CHARARRAY, mgr_id: INT, job: CHARARRAY, salary: INT);
grunt> emp_processed = FOREACH emp GENERATE emp_no, VC(ename) AS TotalVowels;
grunt>

This is the simplest type of possible UDF that we can write in Pig.

.

Sunday, May 31, 2015

Filtering and Limiting Data in Pig

Filed under: Big Data,Hadoop,Pig,Programming,Technical — Vipul Pathak @ 19:00
Tags: , , , , , ,

.

-- emp  = LOAD 'Employees.txt' ... Data in text file resembles the "EMP" table in Oracle
-- dept = LOAD 'Dept.txt' ........ Data in text file resembles the "DEPT" table in Oracle
-- Filter data in emp to only those whose job is Clerk.
Filtered_Emp = FILTER emp BY (job == 'CLERK');

-- Supports filtering of records based on Regular Expression (MATCHES keyword)
-- A logical NOT operation can be applied to the criteria, however the NOT operator
-- should be applied before column name, e.g. NOT ename MATCHES (ename not matches
-- is Invalid). Regular Expression is the standard expression and is specified using
-- the MATCHES keyword.
Emp_Names_Without_A = FILTER emp BY (NOT ename MATCHES '.*[A|a].*');

-- The LIMIT operator works the same way as TOP in many RDBMS queries,
-- e.g. TOP 20 in SQL query is same as using LIMIT  20 here.
-- The guarantee here is to pick 20 tuples from the relation, but no
-- guarantee which 20.
Any_4_Employees = LIMIT emp 4;

.

Saturday, May 30, 2015

Pig Data Types

.

  • Simple: INT and FLOAT are 32 bit signed numeric datatypes backed by java.lang.Integer and java.lang.Float
  • Simple: LONG and DOUBLE are 64 bit signed numeric Java datatypes
  • Simple: CHARARRAY (Unicode backed by java.lang.String)
  • Simple: BYTEARRAY (Bytes / Blob, backed by Pig’s DataByteArray class that wraps byte[])
  • Simple: BOOLEAN (“true” or “false” case sensitive)
  • Simple: DATETIME (Supported format “1970-01-01T00:00:00.000+00:00”).
  • Simple: BIGDECIMAL and BIGINTEGER (size same as in Java)
  • Complex: TUPLE (resembles a Row and is an ordered “Set” of values) : Indicated by ( Parentheses ).
  • Complex: BAG (an unordered “Collection” of Tuples- {(tuple4), (tuple1), …} ) : Indicated by { Curly Braces }.
  • Complex: MAP (a “Collection” of Key-Value pairs: [‘state’#’MD’, ‘name’#’Vipul’] ) : Indicated by [ Square Brackets and stores Key#Value].

The complex types can have other complex datatypes as fields, e.g. A MAP may contain a Tuple as its value and that Tuple may have a Bag as one of its fields. Map’s key is supposed to be a character array, while value can be of any type. The most smallest element of data is an atom (a cell basically) and a collection of atoms make a tuple. A GROUP operation returns a BAG of grouped rows (tuples), which clearly explains that these complex data types can be used in nested fashion. Pig is a gently typed language (neither strongly typed, nor weakly typed). If schema and type information is provided, Pig stick to the standard, however if the type information is not provided, Pig still allow to use of fields and try to conclude the data type based on it’s use.

.

 

Thursday, May 28, 2015

Sample Pig Statements Demonstrating Many Functions

Filed under: Big Data,Hadoop,Pig,Programming,Technical — Vipul Pathak @ 21:47
Tags: , , , , ,

.

Pig Statements

-- Load command loads the data
-- Every placeholder like "A_Rel" and "Filter_A" are called Alias, and they are useful
-- in holding the relation returned by pig statements. Aliases are relations (not variables).
A_Rel = LOAD '/hdfs/path/to/file' [AS (col_1[: type], col_2[: type], col_3[: type], ...)] ;

-- Record set returned by a command is called "Relation"
-- Filter command applies a filter condition on the Data Set / Record Set (Relation)
-- A filter can be viewed as a WHERE clause by SQL enthusiasts.
-- Assignment Operator is =    Equality Comparison Operator is ==
Filter_A = FILTER A_Rel BY (col_1 == "New York");

-- The Dump command can dump the content of a Relation on screen. Every row in a
-- is termed as a "Tuple". This will also trigger the underlying MR job.
DUMP Filter_A;

-- Group By operation in Pig Latin is as easy as applying the GROUP command
-- The grouped result will be stored in Alias "Grp_A"
Grp_A = GROUP A_Rel BY col_1;  -- Grp_A is an alias.

-- As it is already known, DUMP will emit the content of Grp_A on screen.
-- The content will contain a Relation that contains the grouping column value
-- followed by a group of unordered tuples (which is called a Bag, similar to a List),
-- followed by another grouping column value ... etc.
--
-- The first column (e.g. with values like- col_1_val etc) will contain a column called
-- group with value of grouping column, "col_1" in this case. The second column will be
-- the name of grouped relation, (e.g. A_Rel in this case) and that contains all the tuples
-- with same value of col_1 in it.
--
-- Like:   group       | A_Rel
--         ------------|---------------------------
--         col_1_val,  | { {tuple 1}, {tuple 4} }
--         col_2_val,  | { {tuple 2}, {tuple 3} }
--         ------------|---------------------------
DUMP Grp_A;  -- Until DUMP, you are working on Logical Plan only.

-- Join between 2 relations is a breeze, just say- JOIN and list the relation (R2 or R3) and a
-- BY keyword followed by column name OR (zero based) column index ($0 or $3) for that set.
r2 = LOAD '/path/to/another/data/set' ;
r3 = LOAD 'table_3' USING HCatLoader();  -- If table definition was stored in HCatalog
Joined_C = JOIN r2 BY $0, r3 BY emp_id;

Relations (or Sets or Record Sets) can refer and use fields that are associated with an Alias, e.g. in the FILTER statement we specify an Alias and use its field in an expression … “filtered_sals = FILTER salaries BY income > 70000”. Here salaries is an alias pointing to a relation and income is a field within that relation..

 

Wednesday, May 27, 2015

Apache Pig at a High Level

Filed under: Big Data,Hadoop,HDFS,Map-Reduce,Pig,Technical — Vipul Pathak @ 11:28
Tags: , , , , , , ,

Pig is a data flow language developed at Yahoo and is a high level language. Pig programs are translated into a lower level instructions supported by underlying execution engine (e.g. Map/Reduce programs). Pig is designed for working on complex operations (e.g. Joins) with speed.

Pig has mainly 2 components- Interpreter and Pig Latin (Scripting Language).
Pig Latin supports the following execution modes-

  1. Script Mode: Script containing Pig Latin statements.
  2. Grunt Shell Mode: This is interactive mode, where we start the Grunt shell and enter Pig Latin statements interactively. 
  3. Embedded Mode: Using the PigServer java class, we can execute a Pig query from within the Java class.

Grunt Modes

  • Local: pig -x local [script_name.pig]
  • MapReduce: pig [-x mapreduce] [hdfs://…/script_name.pig]
  • Tez: pig -x tez [script_name.pig]

Pig statements are executed by Pig Interpreter after validation is done. Instructions in the script/statement are stored in Logical Plan that the Interpreter makes and are executed after converting them into a Physical Plan when a STORE or DUMP or ILLUSTRATE statement is encountered. Pig internally uses MapReduce or Tez execution engine and Pig scripts are eventually converted into MR jobs to do anything they offer.

Comments
Multiline Comments  /* are covered by C style comment markers */. Single line comments are marked with  — which is more of a SQL like commenting style.

Design philosophy of the Pig tool, are compared to actual Pig (the animal)

  • Pigs eat anything: Pig (the tool) can process any kind of data (structured or unstructured, with or without Metadata, nested/relational/Key-Value).
  • Pigs can live anywhere: Pig (the tool) initially supported only MapReduce but isn’t designed for only one execution engine. Pig also work with Tez, another execution engine that work on Hadoop platform.
  • Pigs are domestic animals: Like Pig (the animal), Pig (the tool) is adjustable and modifiable by users. Pig support a wide variety of User Define Code, in the form of UDFs, Stream commands, custom partitioners etc.
  • Pigs Fly: Pig scripts are fast to execute. Pig evolves in a way that it remain fast to execute and doesn’t become heavy weight.

Install Pig by downloading it from pig.apache.org and untaring it (tar -xvf <tar_file_name> ). Define the Pig installation directory as environment variable PIG_PREFIX and add $PIG_PREFIX/bin to the PATH variable. Also define the HADOOP_PREFIX environment variable. Start Pig’s Grunt shell using the pig command, since HADOOP_PREFIX is defined, Pig will be able to connect to Hadoop Cluster seamlessly.
All HDFS commands work from inside the Grunt shell, e.g we can directly issue the command copyFromLocal or lsr or cd or pwd, without "! hadoop".

Wednesday, May 20, 2015

Default Values Set in Job class in Map/Reduce v2

Filed under: Big Data,Hadoop,Map-Reduce,Technical — Vipul Pathak @ 10:34
Tags: , , , , ,

.

The following are the values that are set by-default, when we don’t set them explicitly –

Job.setInputFormatClass:    TextInputFormat  (Outputs the starting position of each line as Key (Long), and line content as Value (Text))
Job.setMapperClass:         Mapper           (Default mapper, called IdentityMapper in MRv1, which simply output Key/Value as is)
Job.setMapOutputKeyClass:   LongWritable     (The type of key in intermediate Mapper output)
Job.setMapOutputValueClass: Text             (The type of value in intermediate Mapper output)
Job.setPartitionerClass:    HashPartitioner  (Partitioner class that evaluate each record and decide which Reducer to send this record)
Job.setReducerClass:        Reducer          (Default Reducer, similar to IdentityReducer that output what goes in as an Input)
Job.setNumReducetasks:      1                (By default only one Reducer will process output from all Mappers)
Job.setOutputKeyClass:      LongWritable     (The type of key in the Job’s output created by the Reducer)
Job.setOutputValueClass:    Text             (The type of value in the Job’s output created by the Reducer)
Job.setOutputFormatClass:   TextOutputFormat (Writes each record on a separate line using tab character as separator)

Similarly, following settings are default for Streaming API when we don’t set them explicitly (defaults are in bold rust) –

$ export HADOOP_TOOLS=$HADOOP_HOME/share/hadoop/tools
$ hadoop jar $HADOOP_TOOLS/lib/hadoop-streaming-*.jar
  -input           MyInputDirectory  \
  -output          MyOutputDirectory  \
  -mapper          /user/ubuntu/MyScript.sh  \  # This can be any Unix Command or even a Java class
  -inputformat     org.apache.hadoop.mapred.TextInputFormat  \
  -partitioner     org.apache.hadoop.mapred.lib.HashPartitioner  \
  -numReduceTasks  1  \
  -reducer         org.apache.hadoop.mapred.lib.IdentityReducer  \
  -outputformat    org.apache.hadoop.mapred.TextOutputFormat  \
  -io              text  \
  -D               stream.map.input.field.separator=\t  \
  -file            /user/ubuntu/MyScript.sh     # This needs to be transported to each Mapper node

Unlike the Java API, the mapper needs to be specified. Also, the streaming API uses the old MapReduce API classes (org.apache.hadoop.mapred). The file packaging options –file OR –archives are only required to be added if we want to run our own script that is not supposed to be present on all cluster nodes. The unix pipes are not supported in the mapper, e.g. “cat dataFile.txt | wc” will not work.

.

Sunday, May 17, 2015

Apache Sqoop in a Nutshell

Filed under: Hadoop,HDFS,Sqoop,Technical — Vipul Pathak @ 15:46
Tags: , , , ,

.

Sqoop is a utility that can be used to transfer data between SQL based relational data stores tO/from hadoOP. The main operation this utility carry out is performing a data Import to Hadoop from supported relational data sources and Exporting data back to them. Sqoop uses connectors as extensions to connect to data stores. Currently Sqoop supports connectors to Import/Export data from popular databases like Oracle, MYSQL, Postgres, SQL Server, Teradata, DB2 etc. While SQOOP can do more, but the General use includes the following –

  • Import data from database (sqoop  import  OR  sqoop-import command)
  • Export data to the database  (sqoop  export  OR  sqoop-export command)
  • Create a Hive Table using the table structure of similar table in a database

SQOOP uses Map Reduce in the background to launch Jobs that can read/write data using DBInputFormat class and process data in parallel using multiple Mappers. Both Import and Export actions support -m options where the user can specify number of Mappers that SQOOP should launch to perform parallel operation. During import either a –table option can be used OR –query but not both. Here is the summary of some options below and their expected outcome –

sqoop import
–connect jdbc:mysql://server/db32
–username user_1 –password pass2
–table src_table_name –target /hdfs/dir/name
–as-textfile


Imports data from a MYSQL database named db32 using the credentials user_1/pass2. Reads the table src_table_name and places data in an HDFS directory /hdfs/dir/name as a text file.

.

sqoop import
–connect jdbc:mysql://server/db32
–username user_1 –password pass2
–table src_table_name
–columns "col1,col2,col3,col4"
–append  –target /hdfs/dir/name
–as-sequencefile


Imports data with settings same as above but only read the columns specified in the columns list, then append the data to an already existing HDFS directory and save data in Sequence files.

sqoop import
–connect jdbc:mysql://server/db32
–username user_3 –password pass4
–query "SELECT id,name FROM tbl3 WHERE \$CONDITIONS"
–target /hdfs/dir/name
–split-by UID   -m 6


Imports data from MYSQL but use the query to select data instead of using a table and run 6 Mapper jobs (instead of default 4) to read data in parallel, and use the values of UID column to create 6 queries that can run in parallel. Suppose if UID column has values from 1 to 60,000, the $CONDITIONS will be replaced to create conditions like 1 to 10000, 10001 to 20000, … etc. However, if the table doesn’t have any primary key column, then either use -m 1 (OR) use –split-by clause.

.

sqoop create-hive-table
–connect jdbc:mysql://server/db32
–username user_3 –password pass4
–table src_table_name
–fields-terminated-by ‘,’


Creates a Hive table with the same name and schema as the source table. The destination data files will be saved with fields separated by a comma and if the destination table already exists, an Error will be thrown.

.

sqoop import
–connect jdbc:mysql://server/db32
–username user_3 –password pass4
–table src_table_name
–hive-import


Imports data from source database, creates a table in Hive and then load the data into the Hive table. If the Hive table already exists and there is data in it, then an Append will happen.

.

sqoop export
–connect jdbc:mysql://server/db32
–username user_3 –password pass4
–table db_table_name
–export-dir /read/from/dir


Exports data from HDFS to MYSQL into the db_table_name. The table in database must exists already and so Sqoop will append data into it.

.

sqoop export
–connect jdbc:mysql://server/db32
–username user_3 –password pass4
–table db_table_name  –update-key UID
–export-dir /read/from/dir
–input-fields-terminated-by ‘,’


Exports data from HDFS to the database but tells Sqoop that the fields in the input data are separated by the comma and if the value of UID is already found in the database table then perform an update instead of inserting the row.

.

Happy Sqooping  :-)

.

Thursday, May 14, 2015

Apache Hive From 5000 Feets

Filed under: Big Data,Hadoop,Hive,Technical — Vipul Pathak @ 16:33
Tags: , , , ,
.
Apache Hive is an abstraction on top of HDFS data, that allow querying the data using the familiar SQL like language, called HiveQL (Hive Query Language). Hive was developed at Facebook to allow data analysts to query data using an SQL like language. Hive has limited commands and is similar to basic SQL (advance SQL options are not supported), but provide nice functionality on top of this limited syntax. Hive is intended to provide data warehouse like functionality on top of Hadoop and helps defining structure for the unstructured big data. It provide features to query analytical data using HiveQL.
Hive is not suitable for OLTP (since it is based on Hadoop and its queries have high latency), but is good for running analytical batch jobs on high volume data that is historical or non-mutable. Typical use case include copying unstructured data on HDFS, Running multiple cycles of Map/Reduce to process data and store (semi-)structured data file on HDFS and then using hive to define/apply the structure. Once structure is defined, data analyst’ point to these data files (or load data from these data files) and run various type of queries to analyze data and find hidden statistical patterns out of the data  :-)
Hive closely resemble with SQL and at a high level, support the following type of activities –
  1. Creation and Removal of Databases
  2. Creation, Alteration and Removal of Managed/External Tables within (or outside of) these databases
  3. Insertion/Loading of data inside these Tables
  4. Selection of data using HiveQL, including the capability of performing Join operations
  5. Allows the use of aggregation functions in the Select queries, that most of the times, triggers Map/Reduce jobs in the background.
  6. Structured output on query, (e.g. Schema on Read) not while loading data but when retrieving data.
 .

Friday, July 26, 2013

Difference Between Software Architecture And Software Design

I had this question in my mind for years. Based on some experience, some good books (listed in good reads) and my constant interaction with some architecture geniuses at work, I now have clarity on this :-)

Generally speaking, architecture is a branch of design. So every architectural work is a kind of design, but the reverse is not true. Not all design is architecture. Architectural designs addresses a specific goal. The goal almost always is to assure that the system meet its quality attributes and behavioral requirements. The architect creates design that manifest significant decisions and constraints in the system’s architectural structure. There are also design decisions, that are not architecturally significant, i.e. does not belongs to the architecture branch of design. We can view them as some component’s internal design decisions, like- choice of algorithm, selection of data structure etc. Any design decision, which isn’t visible outside of its component boundary is a component’s internal design and is non-architectural. These are the design decisions a system architect would leave on module designer’s discretion or the implementation team. The implementation team is free to design their subsystem as long as their design don’t break the architectural constraints imposed by the system level architecture.

Architecture can be detailed or descriptive, such as defining a protocol that will be used by outside world entities to interact with the system (e.g. A hypermedia RESTful interface for a service of a large system), OR it may be some constraints defined at high level to support quality attributes and to be followed by the system’s internal implementation (e.g. The usage information of all the 6 resources should be collected every 15 seconds and stored in-memory in binary format with memory usage of no more than 16 bits per sample; The usage info buffer should be uploaded to server every 8 hours or whenever the coverage is available after 8 hours).

Architectural design deals with the quality attributes (performance, security, changeability, reliability, user experience etc.) and behavioral requirements (e.g. Usage information of devices should be upload to server with least possible network footprint) of the system/element. Anything that needs design but is not in the above two categories, IS element’s internal design AND NOT system’s architectural design. However, module designers may have to introduce structure or may have to take design decisions to fulfill the behavioral requirements of their module. In that case, those design decisions are architectural to their module, but not to entire system. Hence, design decisions to be called as architectural or not, is a function of the context.

Saturday, July 20, 2013

What is Software Architecture – Some Thoughts

Filed under: Architecture,Technical — Vipul Pathak @ 03:49
Tags: , , , ,

 

Casually speaking, Software architecture is the careful partitioning or subdivision of a system as whole, into sub-parts with specific relationship between these sub-parts. This partitioning is what allows different people (or group(s) of people) to work together cooperatively to solve a much bigger problem, then any of them could solve by individually dealing with it. Each person (or team of persons) creates a software part (you may want to call it a component, module or a sub-system) that interacts with other person’s (or team’s) software part. This interaction happens through carefully crafted interfaces that exposes minimum but most stable information necessary for interaction.

The larger and complex the system is, the more critical is this subdivision. A system is almost always, unavoidably can be divided into parts in more than one way. Each way of division, results in creation of architectural structure. Each way of dividing the system into parts, relationships between them and the way they interact (interface), is a result of careful design to satisfy the driving quality attribute (security, performance, changeability etc.) and business goal behind them. So a system may be sub-divided into parts in different ways, if the driving quality attribute or business goal is different.

Finally, a formal definition of software architecture from IEEE –

“The fundamental organization of a system embodied in its components, their relationships to each other, and to the environment, and the principles guiding its design and evolution.”

Why architecture is important?

Software architecture is a set of design decisions which are hard to revert later and if made incorrectly, may cause the project to cancel or be unsuccessfully. Since these architectural design decisions are the ones that permits a system to meet its quality attributes and behavioral requirements, if these decisions go wrong, the system cannot achieve the desired quality or behavior. The quality attributes are usually parts of requirements (non-functional requirements) and failing to fulfill these requirements automatically invalidate the solution.

Sunday, May 30, 2010

Friday, May 14, 2010

Status Returned By BlackBerry API During "Data Connection Refused" Conditions

The following reading were recorded on 13 BlackBerry mobile phone. Some of them (columns marked with *) were Blackberry phone with correct Blackberry data plan, while some of them were simple non blackberry phones (like- Nokia 6600, explicitly shown) with GPRS connectivity. One of them were even lacking the GPRS connectivity. I studied the data connectivity with an intention to record the behaviour of RIM’s BlackBerry API in low or no data connectivity conditions.

The BlackBerry devices used of the study includes- BlackBerry 9000 (Bold), BlackBerry 8320 (Curve), BlackBerry 8110 (Pearl), BlackBerry 8800, BlackBerry 8300 (Curve) etc. The BlackBerry JDE used was 4.3.

Row number 1 and 2 (Carrier? And MDS?) represents values returned by CoverageInfo.getCoverageStatus().

All “True +ve” shown above are real Data Connection Refused cases. Below is a cut down version of the above table with only the required data. Two of “True +ve” are matching with each other (Column D and E below). However, Column C (being a True +ve) is matching with all the other “False +ve”, e.g. Column B.

Due to identification of Column B as Out of Coverage (though the indicator on home screen shows capital EDGE), the conclusion based on various Blackberry APIs is confusing.

Saturday, May 08, 2010

Setting READ/WRITE Permissions for the ISAPI Extension Hosted on IIS

The script (or the ISAPI Extension) that we are going to deploy and grant additional permissions, will be writing to a File and hence need WRITE permissions as well. Here is how we will do it:

Find out the User Id

IIS’s World Wide Web Publishing Service executes using the Local System account, but it usually impersonates a different account to execute any ISAPI extension. Do the following to discover which User Id is being used by IIS for impersonating and running the extension:

1. Start Internet Information Services Manager from Administrative Tools under Control Panel.

2. Expand the tree on the left side and select the Website under which the ISAPI extension will be deployed.

3. Next, on the right side, under the IIS group, double click on the Authentication icon (see below):

4. In the resulting listing, under the Features View tab, choose Anonymous Authentication and click on the Edit action.

5. The identity selected in the resulting dialog box is used by IIS for impersonating (see below):

If the identity select isn’t shown clearly, but instead the Application Pool Identity is shown as selected, here is how to find which account is used by the Application Pool:

6. From the left pane, select the Website and click on Basic Settings link from the Actions area at the right. The resulting dialog will tell us, which Application Pool is being used by this Website (see below):

7. Click Cancel and dismiss the dialog. Next, from the left pane, select the Application Pools node just above the Sites node. The right pane will show available application pools. Select the Application Pool, that’s being used by our Website.

8. Now click on the Advanced Settings action under the Actions area on the right side (see below):

9. The Identity property shows the account used by the application pool.

Setting the Directory Permission on NTFS

Now that we know, which account is used by IIS for the purpose of impersonation, let’s set WRITE permissions on the correct directory for the User Id used by IIS. Let’s say- we have an ISAPI Extension or a script that creates or updates a file on the local file system. Based on what we know, we need to grant WRITE permission on a folder where our ISAPI Extension will write. Now since, we know the User Id that will be used by IIS for impersonation, let grant write permission to that user, (see below):

Finally, restart IIS. From the command line, use the command IISReset. We are done J.

Installing an ISAPI Extension as a Wildcard Handler

Filed under: Technical,Web — Vipul Pathak @ 13:47
Tags: , , ,

Starting from IIS 6.0, an ISAPI Extension can be added to any Web site or Application, as a wildcard script. By wild card script, I mean an ISAPI extension can be registered with IIS to execute on every incoming request on Web site/Application. All we need to do it register the extension as a wildcard handler. A wildcard handler receives the incoming request, before the actually intended recipient. There may be multiple wildcard handlers registered on the same site or application, which will all be executed in order. The ISAPI Extension, when invoked by IIS, can process the request and pass the request to another handler in chain. Below is how we will register the wildcard script on an application (or website):

IIS 6.0

  1. Start IIS Manager and expand the tree on the left side.
  2. Select any Application/ Website on which you want to install the wildcard handler. Right click and select Properties.
    Application Properties
    Application Properties
  3. In the Application Settings section on Home Directory or Virtual Directory tab, click on the Configuration.
  4. In the Wildcard Application Maps section on the Mappings tab, click the Insert button.
    Specifying a wildcard handler DLL
    Specifying a wildcard handler DLL
  5. Type or Browse the path of our ISAPI Extension DLL and click OK 3 times to go back at work.

IIS 7.0

  1. Start IIS Manager and expand the tree on the left side.
  2. Select any Application/ Website on which you want to install the wildcard handler.
  3. On the right pane, within the features view, double click on the Handler Mappings icon under IIS section (as shown below).
    Mappings Handler Settings in IIS 7.0
    The Handler Mappings Shortcut in IIS 7.0
  4. Once the page containings all the existing mappings opens, locate the link Add Wildcard Script Map… at the right side and click it (as shown below).
    Link to Add a Wildcard Handler Script
    Link to Add a Wildcard Handler Script
  5. In the resulting dialog, specify the path of our ISAPI extension DLL and give it a name (Below):
    Specify the ISAPI Extension DLL
    Specify the ISAPI Extension DLL
  6. We are done  :-)

Thursday, August 20, 2009

Hello World!

Filed under: Uncategorized — Vipul Pathak @ 13:26

Greetings !!

Wikipedia is great source of free knowledge for nearly all of us. Lets start with supporting Wikipedia :
Wikipedia Affiliate Button

There is also a great source of software engineering audio podcasts available, that you can find at: http://www.se-radio.net. If you like the podcast, you may want to support SE-Radio too:
Support Software Engineering Radio

Will meet you soon here, with my first article.

Cheers.

The Rubric Theme. Blog at WordPress.com.

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: