Friday, August 28, 2015

Big Data Basics - Part 1 - Introduction to Big Data

Big Data Basics - Part 1 - Introduction to Big Data

Problem

I have been hearing the term Big Data for a while now and would like to know more about it. Can you explain what this term means, how it evolved, and how we identify Big Data and any other relevant details?

Solution

Big Data has been a buzz word for quite some time now and it is catching popularity faster than pretty much anything else in the technology world. In this tip, let us understand what this buzz word is all about, what is its significance, why you should care about it, and more.

What is Big Data?

Wikipedia defines "Big Data" as a collection of data sets so large and complex that it becomes difficult to process using on-hand database management tools or traditional data processing applications.
In simple terms, "Big Data" consists of very large volumes of heterogeneous data that is being generated, often, at high speeds. These data sets cannot be managed and processed using traditional data management tools and applications at hand. Big Data requires the use of a new set of tools, applications and frameworks to process and manage the data.

Evolution of Data / Big Data

Data has always been around and there has always been a need for storage, processing, and management of data, since the beginning of human civilization and human societies. However, the amount and type of data captured, stored, processed, and managed depended then and even now on various factors including the necessity felt by humans, available tools/technologies for storage, processing, management, effort/cost, ability to gain insights into the data, make decisions, and so on.
Going back a few centuries, in the ancient days, humans used very primitive ways of capturing/storing data like carving on stones, metal sheets, wood, etc. Then with new inventions and advancements a few centuries in time, humans started capturing the data on paper, cloth, etc. As time progressed, the medium of capturing/storage/management became punching cards followed by magnetic drums, laser disks, floppy disks, magnetic tapes, and finally today we are storing data on various devices like USB Drives, Compact Discs, Hard Drives, etc.
In fact the curiosity to capture, store, and process the data has enabled human beings to pass on knowledge and research from one generation to the next, so that the next generation does not have to re-invent the wheel.
As we can clearly see from this trend, the capacity of data storage has been increasing exponentially, and today with the availability of the cloud infrastructure, potentially one can store unlimited amounts of data. Today Terabytes and Petabytes of data is being generated, captured, processed, stored, and managed.

Characteristics of Big Data - The Three V's of Big Data

When do we say we are dealing with Big Data? For some people 1TB might seem big, for others 10TB might be big, for others 100GB might be big, and something else for others. This term is qualitative and it cannot really be quantified. Hence we identify Big Data by a few characteristics which are specific to Big Data. These characteristics of Big Data are popularly known as Three V's of Big Data.
The three v's of Big Data are Volume, Velocity, and Variety as shown below.


Volume

Volume refers to the size of data that we are working with. With the advancement of technology and with the invention of social media, the amount of data is growing very rapidly. This data is spread across different places, in different formats, in large volumes ranging from Gigabytes to Terabytes, Petabytes, and even more. Today, the data is not only generated by humans, but large amounts of data is being generated by machines and it surpasses human generated data. This size aspect of data is referred to as Volume in the Big Data world.

Velocity

Velocity refers to the speed at which the data is being generated. Different applications have different latency requirements and in today's competitive world, decision makers want the necessary data/information in the least amount of time as possible. Generally, in near real time or real time in certain scenarios. In different fields and different areas of technology, we see data getting generated at different speeds. A few examples include trading/stock exchange data, tweets on Twitter, status updates/likes/shares on Facebook, and many others. This speed aspect of data generation is referred to as Velocity in the Big Data world.

Variety

Variety refers to the different formats in which the data is being generated/stored. Different applications generate/store the data in different formats. In today's world, there are large volumes of unstructured data being generated apart from the structured data getting generated in enterprises. Until the advancements in Big Data technologies, the industry didn't have any powerful and reliable tools/technologies which can work with such voluminous unstructured data that we see today. In today's world, organizations not only need to rely on the structured data from enterprise databases/warehouses, they are also forced to consume lots of data that is being generated both inside and outside of the enterprise like clickstream data, social media, etc. to stay competitive. Apart from the traditional flat files, spreadsheets, relational databases etc., we have a lot of unstructured data stored in the form of images, audio files, video files, web logs, sensor data, and many others. This aspect of varied data formats is referred to as Variety in the Big Data world.

Sources of Big Data

Just like the data storage formats have evolved, the sources of data have also evolved and are ever expanding. There is a need for storing the data into a wide variety of formats. With the evolution and advancement of technology, the amount of data that is being generated is ever increasing. Sources of Big Data can be broadly classified into six different categories as shown below.


Enterprise Data

There are large volumes of data in enterprises in different formats. Common formats include flat files, emails, Word documents, spreadsheets, presentations, HTML pages/documents, pdf documents, XMLs, legacy formats, etc. This data that is spread across the organization in different formats is referred to as Enterprise Data.

Transactional Data

Every enterprise has some kind of applications which involve performing different kinds of transactions like Web Applications, Mobile Applications, CRM Systems, and many more. To support the transactions in these applications, there are usually one or more relational databases as a backend infrastructure. This is mostly structured data and is referred to as Transactional Data.

Social Media

This is self-explanatory. There is a large amount of data getting generated on social networks like Twitter, Facebook, etc. The social networks usually involve mostly unstructured data formats which includes text, images, audio, videos, etc. This category of data source is referred to as Social Media.

Activity Generated

There is a large amount of data being generated by machines which surpasses the data volume generated by humans. These include data from medical devices, censor data, surveillance videos, satellites, cell phone towers, industrial machinery, and other data generated mostly by machines. These types of data are referred to as Activity Generated data.

Public Data

This data includes data that is publicly available like data published by governments, research data published by research institutes, data from weather and meteorological departments, census data, Wikipedia, sample open source data feeds, and other data which is freely available to the public. This type of publicly accessible data is referred to as Public Data.

Archives

Organizations archive a lot of data which is either not required anymore or is very rarely required. In today's world, with hardware getting cheaper, no organization wants to discard any data, they want to capture and store as much data as possible. Other data that is archived includes scanned documents, scanned copies of agreements, records of ex-employees/completed projects, banking transactions older than the compliance regulations. This type of data, which is less frequently accessed, is referred to as Archive Data.

Formats of Data

Data exists in multiple different formats and the data formats can be broadly classified into two categories - Structured Dataand Unstructured Data.
Structured data refers to the data which has a pre-defined data model/schema/structure and is often either relational in nature or is closely resembling a relational model. Structured data can be easily managed and consumed using the traditional tools/techniques. Unstructured data on the other hand is the data which does not have a well-defined data model or does not fit well into the relational world.
Structured data includes data in the relational databases, data from CRM systems, XML files etc. Unstructured data includes flat files, spreadsheets, Word documents, emails, images, audio files, video files, feeds, PDF files, scanned documents, etc.

Big Data Statistics

  • 100 Terabytes of data is uploaded to Facebook every day
  • Facebook Stores, Processes, and Analyzes more than 30 Petabytes of user generated data
  • Twitter generates 12 Terabytes of data every day
  • LinkedIn processes and mines Petabytes of user data to power the "People You May Know" feature
  • YouTube users upload 48 hours of new video content every minute of the day
  • Decoding of the human genome used to take 10 years. Now it can be done in 7 days
  • 500+ new websites are created every minute of the day
  • Source: Wikibon - A Comprehensive List of Big Data Statistics
In this tip we were introduced to Big Data, how it evolved, what are its primary characteristics, what are the sources of data, and a few statistics showing how large volumes of heterogeneous data is being generated at different speeds.

References
  • http://en.m.wikipedia.org/wiki/Big_data
  • http://strata.oreilly.com/2012/01/what-is-big-data.html
Next Steps
  • Explore more about Big Data. Do some of your own searches to see what you can find.
  • Stay tuned for future tips in this series to learn more about the Big Data ecosystem.

Hadoop FAQ

1. General

1.1. What is Hadoop?

Hadoop is a distributed computing platform written in Java. It incorporates features similar to those of the Google File System and of MapReduce. For some details, seeHadoopMapReduce.

1.2. What platforms and Java versions does Hadoop run on?

Java 1.6.x or higher, preferably from Sun -see HadoopJavaVersions
Linux and Windows are the supported operating systems, but BSD, Mac OS/X, and OpenSolaris are known to work. (Windows requires the installation of Cygwin).

13. How well does Hadoop scale?

Hadoop has been demonstrated on clusters of up to 4000 nodes. Sort performance on 900 nodes is good (sorting 9TB of data on 900 nodes takes around 1.8 hours) and improving using these non-default configuration values:
dfs.block.size = 134217728
dfs.namenode.handler.count = 40
mapred.reduce.parallel.copies = 20
mapred.child.java.opts = -Xmx512m
fs.inmemory.size.mb = 200
io.sort.factor = 100
io.sort.mb = 200
io.file.buffer.size = 131072
Sort performances on 1400 nodes and 2000 nodes are pretty good too - sorting 14TB of data on a 1400-node cluster takes 2.2 hours; sorting 20TB on a 2000-node cluster takes 2.5 hours. The updates to the above configuration being:
mapred.job.tracker.handler.count = 60
mapred.reduce.parallel.copies = 50
tasktracker.http.threads = 50
mapred.child.java.opts = -Xmx1024m

1.4. What kind of hardware scales best for Hadoop?

The short answer is dual processor/dual core machines with 4-8GB of RAM using ECC memory, depending upon workflow needs. Machines should be moderately high-end commodity machines to be most cost-effective and typically cost 1/2 - 2/3 the cost of normal production application servers but are not desktop-class machines. This cost tends to be $2-5K. For a more detailed discussion, see MachineScaling page.

1.5. I have a new node I want to add to a running Hadoop cluster; how do I start services on just one node?

This also applies to the case where a machine has crashed and rebooted, etc, and you need to get it to rejoin the cluster. You do not need to shutdown and/or restart the entire cluster in this case.
First, add the new node's DNS name to the conf/slaves file on the master node.
Then log in to the new slave node and execute:
$ cd path/to/hadoop $ bin/hadoop-daemon.sh start datanode $ bin/hadoop-daemon.sh start tasktracker

If you are using the dfs.include/mapred.include functionality, you will need to additionally add the node to the dfs.include/mapred.include file, then issue hadoop dfsadmin -refreshNodes and hadoop mradmin -refreshNodes so that the NameNode and JobTracker know of the additional node that has been added.

1.6. Is there an easy way to see the status and health of a cluster?

There are web-based interfaces to both the JobTracker (MapReduce master) and NameNode (HDFS master) which display status pages about the state of the entire system. By default, these are located at http://job.tracker.addr:50030/ and http://name.node.addr:50070/.
The JobTracker status page will display the state of all nodes, as well as the job queue and status about all currently running jobs and tasks. The NameNode status page will display the state of all nodes and the amount of free space, and provides the ability to browse the DFS via the web.
You can also see some basic HDFS cluster health data by running:
$ bin/hadoop dfsadmin -report

1.7. How much network bandwidth might I need between racks in a medium size (40-80 node) Hadoop cluster?

The true answer depends on the types of jobs you're running. As a back of the envelope calculation one might figure something like this:
60 nodes total on 2 racks = 30 nodes per rack Each node might process about 100MB/sec of data In the case of a sort job where the intermediate data is the same size as the input data, that means each node needs to shuffle 100MB/sec of data In aggregate, each rack is then producing about 3GB/sec of data However, given even reducer spread across the racks, each rack will need to send 1.5GB/sec to reducers running on the other rack. Since the connection is full duplex, that means you need 1.5GB/sec of bisection bandwidth for this theoretical job. So that's 12Gbps.
However, the above calculations are probably somewhat of an upper bound. A large number of jobs have significant data reduction during the map phase, either by some kind of filtering/selection going on in the Mapper itself, or by good usage of Combiners. Additionally, intermediate data compression can cut the intermediate data transfer by a significant factor. Lastly, although your disks can probably provide 100MB sustained throughput, it's rare to see a MR job which can sustain disk speed IO through the entire pipeline. So, I'd say my estimate is at least a factor of 2 too high.
So, the simple answer is that 4-6Gbps is most likely just fine for most practical jobs. If you want to be extra safe, many inexpensive switches can operate in a "stacked" configuration where the bandwidth between them is essentially backplane speed. That should scale you to 96 nodes with plenty of headroom. Many inexpensive gigabit switches also have one or two 10GigE ports which can be used effectively to connect to each other or to a 10GE core.

1.8. How can I help to make Hadoop better?

If you have trouble figuring how to use Hadoop, then, once you've figured something out (perhaps with the help of the mailing lists), pass that knowledge on to others by adding something to this wiki.
If you find something that you wish were done better, and know how to fix it, read HowToContribute, and contribute a patch.

1.9. I am seeing connection refused in the logs. How do I troubleshoot this?

See ConnectionRefused

1.10. Why is the 'hadoop.tmp.dir' config default user.name dependent?

We need a directory that a user can write and also not to interfere with other users. If we didn't include the username, then different users would share the same tmp directory.This can cause authorization problems, if folks' default umask doesn't permit write by others. It can also result in folks stomping on each other, when they're, e.g., playing with HDFS and re-format their filesystem.

1.11. Does Hadoop require SSH?

Hadoop provided scripts (e.g., start-mapred.sh and start-dfs.sh) use ssh in order to start and stop the various daemons and some other utilities. The Hadoop framework in itself does not require ssh. Daemons (e.g. TaskTracker and DataNode) can also be started manually on each node without the script's help.

1.12. What mailing lists are available for more help?

A description of all the mailing lists are on the http://hadoop.apache.org/mailing_lists.html page. In general:
general is for people interested in the administrivia of Hadoop (e.g., new release discussion).
user@hadoop.apache.org is for people using the various components of the framework.
-dev mailing lists are for people who are changing the source code of the framework. For example, if you are implementing a new file system and want to know about theFileSystem API, hdfs-dev would be the appropriate mailing list.

1.13. What does "NFS: Cannot create lock on (some dir)" mean?

This actually is not a problem with Hadoop, but represents a problem with the setup of the environment it is operating.
Usually, this error means that the NFS server to which the process is writing does not support file system locks. NFS prior to v4 requires a locking service daemon to run (typically rpc.lockd) in order to provide this functionality. NFSv4 has file system locks built into the protocol.
In some (rarer) instances, it might represent a problem with certain Linux kernels that did not implement the flock() system call properly.
It is highly recommended that the only NFS connection in a Hadoop setup be the place where the NameNode writes a secondary or tertiary copy of the fsimage and edits log. All other users of NFS are not recommended for optimal performance.

2. MapReduce

2.1. Do I have to write my job in Java?

No. There are several ways to incorporate non-Java code.
HadoopStreaming permits any shell command to be used as a map or reduce function.
libhdfs, a JNI-based C API for talking to hdfs (only).
Hadoop Pipes, a SWIG-compatible C++ API (non-JNI) to write map-reduce jobs.

2.2. How do I submit extra content (jars, static files, etc) for my job to use during runtime?

The distributed cache feature is used to distribute large read-only files that are needed by map/reduce jobs to the cluster. The framework will copy the necessary files from a URL (either hdfs: or http:) on to the slave node before any tasks for the job are executed on that node. The files are only copied once per job and so should not be modified by the application.
For streaming, see the HadoopStreaming wiki for more information.
Copying content into lib is not recommended and highly discouraged. Changes in that directory will require Hadoop services to be restarted.

2.3. How do I get my MapReduce Java Program to read the Cluster's set configuration and not just defaults?

The configuration property files ({core|mapred|hdfs}-site.xml) that are available in the various conf/ directories of your Hadoop installation needs to be on the CLASSPATH of your Java application for it to get found and applied. Another way of ensuring that no set configuration gets overridden by any Job is to set those properties as final; for example:
<name>mapreduce.task.io.sort.mb</name> <value>400</value> <final>true</final>

Setting configuration properties as final is a common thing Administrators do, as is noted in the Configuration API docs.
A better alternative would be to have a service serve up the Cluster's configuration to you upon request, in code. https://issues.apache.org/jira/browse/HADOOP-5670 may be of some interest in this regard, perhaps.

2.4. Can I write create/write-to hdfs files directly from map/reduce tasks?

Yes. (Clearly, you want this since you need to create/write-to files other than the output-file written out by OutputCollector.)
Caveats:
${mapred.output.dir} is the eventual output directory for the job (JobConf.setOutputPath / JobConf.getOutputPath).
${taskid} is the actual id of the individual task-attempt (e.g. task_200709221812_0001_m_000000_0), a TIP is a bunch of ${taskid}s (e.g. task_200709221812_0001_m_000000).
With speculative-execution on, one could face issues with 2 instances of the same TIP (running simultaneously) trying to open/write-to the same file (path) on hdfs. Hence the app-writer will have to pick unique names (e.g. using the complete taskid i.e. task_200709221812_0001_m_000000_0) per task-attempt, not just per TIP. (Clearly, this needs to be done even if the user doesn't create/write-to files directly via reduce tasks.)
To get around this the framework helps the application-writer out by maintaining a special ${mapred.output.dir}/_${taskid} sub-dir for each reduce task-attempt on hdfs where the output of the reduce task-attempt goes. On successful completion of the task-attempt the files in the ${mapred.output.dir}/_${taskid} (of the successful taskid only) are moved to ${mapred.output.dir}. Of course, the framework discards the sub-directory of unsuccessful task-attempts. This is completely transparent to the application.
The application-writer can take advantage of this by creating any side-files required in ${mapred.output.dir} during execution of his reduce-task, and the framework will move them out similarly - thus you don't have to pick unique paths per task-attempt.
Fine-print: the value of ${mapred.output.dir} during execution of a particular reduce task-attempt is actually ${mapred.output.dir}/_{$taskid}, not the value set by JobConf.setOutputPath. So, just create any hdfs files you want in ${mapred.output.dir} from your reduce task to take advantage of this feature.
For map task attempts, the automatic substitution of ${mapred.output.dir}/_${taskid} for ${mapred.output.dir} does not take place. You can still access the map task attempt directory, though, by using FileOutputFormat.getWorkOutputPath(TaskInputOutputContext). Files created there will be dealt with as described above.
The entire discussion holds true for maps of jobs with reducer=NONE (i.e. 0 reduces) since output of the map, in that case, goes directly to hdfs.

2.5. How do I get each of a job's maps to work on one complete input-file and not allow the framework to split-up the files?

Essentially a job's input is represented by the InputFormat(interface)/FileInputFormat(base class).
For this purpose one would need a 'non-splittable' FileInputFormat i.e. an input-format which essentially tells the map-reduce framework that it cannot be split-up and processed. To do this you need your particular input-format to return false for the isSplittable call.
E.g. org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat in src/test/org/apache/hadoop/mapred/SortValidator.java
In addition to implementing the InputFormat interface and having isSplitable(...) returning false, it is also necessary to implement the RecordReader interface for returning the whole content of the input file. (default is LineRecordReader, which splits the file into separate lines)
The other, quick-fix option, is to set mapred.min.split.size to large enough value.

2.6. Why I do see broken images in jobdetails.jsp page?

In hadoop-0.15, Map / Reduce task completion graphics are added. The graphs are produced as SVG(Scalable Vector Graphics) images, which are basically xml files, embedded in html content. The graphics are tested successfully in Firefox 2 on Ubuntu and MAC OS. However for other browsers, one should install an additional plugin to the browser to see the SVG images. Adobe's SVG Viewer can be found at http://www.adobe.com/svg/viewer/install/.

2.7. I see a maximum of 2 maps/reduces spawned concurrently on each TaskTracker, how do I increase that?

Use the configuration knob: mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum to control the number of maps/reduces spawned simultaneously on a TaskTracker. By default, it is set to 2, hence one sees a maximum of 2 maps and 2 reduces at a given instance on a TaskTracker.
You can set those on a per-tasktracker basis to accurately reflect your hardware (i.e. set those to higher nos. on a beefier tasktracker etc.).

2.8. Submitting map/reduce jobs as a different user doesn't work.

The problem is that you haven't configured your map/reduce system directory to a fixed value. The default works for single node systems, but not for "real" clusters. I like to use:
<property> <name>mapred.system.dir</name> <value>/hadoop/mapred/system</value> <description>The shared directory where MapReduce stores control files. </description> </property>

Note that this directory is in your default file system and must be accessible from both the client and server machines and is typically in HDFS.

2.9. How do Map/Reduce InputSplit's handle record boundaries correctly?

It is the responsibility of the InputSplit's RecordReader to start and end at a record boundary. For SequenceFile's every 2k bytes has a 20 bytes sync mark between the records. These sync marks allow the RecordReader to seek to the start of the InputSplit, which contains a file, offset and length and find the first sync mark after the start of the split. TheRecordReader continues processing records until it reaches the first sync mark after the end of the split. The first split of each file naturally starts immediately and not after the first sync mark. In this way, it is guaranteed that each record will be processed by exactly one mapper.
Text files are handled similarly, using newlines instead of sync marks.

2.10. How do I change final output file name with the desired name rather than in partitions like part-00000, part-00001?

You can subclass the OutputFormat.java class and write your own. You can locate and browse the code of TextOutputFormat, MultipleOutputFormat.java, etc. for reference. It might be the case that you only need to do minor changes to any of the existing Output Format classes. To do that you can just subclass that class and override the methods you need to change.

2.11. When writing a New InputFormat, what is the format for the array of string returned by InputSplit\#getLocations()?

It appears that DatanodeID.getHost() is the standard place to retrieve this name, and the machineName variable, populated in DataNode.java\#startDataNode, is where the name is first set. The first method attempted is to get "slave.host.name" from the configuration; if that is not available, DNS.getDefaultHost is used instead.

2.12. How do you gracefully stop a running job?

hadoop job -kill JOBID

2.13. How do I limit (or increase) the number of concurrent tasks a job may have running total at a time?

2.14. How do I limit (or increase) the number of concurrent tasks running on a node?

For both answers, see LimitingTaskSlotUsage.

3. HDFS

3.1. If I add new DataNodes to the cluster will HDFS move the blocks to the newly added nodes in order to balance disk space utilization between the nodes?

No, HDFS will not move blocks to new nodes automatically. However, newly created files will likely have their blocks placed on the new nodes.
There are several ways to rebalance the cluster manually.
Select a subset of files that take up a good percentage of your disk space; copy them to new locations in HDFS; remove the old copies of the files; rename the new copies to their original names.
A simpler way, with no interruption of service, is to turn up the replication of files, wait for transfers to stabilize, and then turn the replication back down.
Yet another way to re-balance blocks is to turn off the data-node, which is full, wait until its blocks are replicated, and then bring it back again. The over-replicated blocks will be randomly removed from different nodes, so you really get them rebalanced not just removed from the current node.
Finally, you can use the bin/start-balancer.sh command to run a balancing process to move blocks around the cluster automatically. See
HDFS User Guide: Rebalancer;
HDFS Tutorial: Rebalancing;
HDFS Commands Guide: balancer.

3.2. What is the purpose of the secondary name-node?

The term "secondary name-node" is somewhat misleading. It is not a name-node in the sense that data-nodes cannot connect to the secondary name-node, and in no event it can replace the primary name-node in case of its failure.
The only purpose of the secondary name-node is to perform periodic checkpoints. The secondary name-node periodically downloads current name-node image and edits log files, joins them into new image and uploads the new image back to the (primary and the only) name-node. See User Guide.
So if the name-node fails and you can restart it on the same physical node then there is no need to shutdown data-nodes, just the name-node need to be restarted. If you cannot use the old node anymore you will need to copy the latest image somewhere else. The latest image can be found either on the node that used to be the primary before failure if available; or on the secondary name-node. The latter will be the latest checkpoint without subsequent edits logs, that is the most recent name space modifications may be missing there. You will also need to restart the whole cluster in this case.

3.3. Does the name-node stay in safe mode till all under-replicated files are fully replicated?

No. During safe mode replication of blocks is prohibited. The name-node awaits when all or majority of data-nodes report their blocks.
Depending on how safe mode parameters are configured the name-node will stay in safe mode until a specific percentage of blocks of the system is minimally replicated dfs.replication.min. If the safe mode threshold dfs.safemode.threshold.pct is set to 1 then all blocks of all files should be minimally replicated.
Minimal replication does not mean full replication. Some replicas may be missing and in order to replicate them the name-node needs to leave safe mode.
Learn more about safe mode in the HDFS Users' Guide.

3.4. How do I set up a hadoop node to use multiple volumes?

Data-nodes can store blocks in multiple directories typically allocated on different local disk drives. In order to setup multiple directories one needs to specify a comma separated list of pathnames as a value of the configuration parameter dfs.datanode.data.dir. Data-nodes will attempt to place equal amount of data in each of the directories.
The name-node also supports multiple directories, which in the case store the name space image and the edits log. The directories are specified via the dfs.namenode.name.dir configuration parameter. The name-node directories are used for the name space data replication so that the image and the log could be restored from the remaining volumes if one of them fails.

3.5. What happens if one Hadoop client renames a file or a directory containing this file while another client is still writing into it?

Starting with release hadoop-0.15, a file will appear in the name space as soon as it is created. If a writer is writing to a file and another client renames either the file itself or any of its path components, then the original writer will get an IOException either when it finishes writing to the current block or when it closes the file.

3.6. I want to make a large cluster smaller by taking out a bunch of nodes simultaneously. How can this be done?

On a large cluster removing one or two data-nodes will not lead to any data loss, because name-node will replicate their blocks as long as it will detect that the nodes are dead. With a large number of nodes getting removed or dying the probability of losing data is higher.
Hadoop offers the decommission feature to retire a set of existing data-nodes. The nodes to be retired should be included into the exclude file, and the exclude file name should be specified as a configuration parameter dfs.hosts.exclude. This file should have been specified during namenode startup. It could be a zero length file. You must use the full hostname, ip or ip:port format in this file. (Note that some users have trouble using the host name. If your namenode shows some nodes in "Live" and "Dead" but not decommission, try using the full ip:port.) Then the shell command
bin/hadoop dfsadmin -refreshNodes

should be called, which forces the name-node to re-read the exclude file and start the decommission process.
Decommission is not instant since it requires replication of potentially a large number of blocks and we do not want the cluster to be overwhelmed with just this one job. The decommission progress can be monitored on the name-node Web UI. Until all blocks are replicated the node will be in "Decommission In Progress" state. When decommission is done the state will change to "Decommissioned". The nodes can be removed whenever decommission is finished.
The decommission process can be terminated at any time by editing the configuration or the exclude files and repeating the -refreshNodes command.

3.7. Wildcard characters doesn't work correctly in FsShell.

When you issue a command in FsShell, you may want to apply that command to more than one file. FsShell provides a wildcard character to help you do so. The * (asterisk) character can be used to take the place of any set of characters. For example, if you would like to list all the files in your account which begin with the letter x, you could use the ls command with the * wildcard:
bin/hadoop dfs -ls x*


Sometimes, the native OS wildcard support causes unexpected results. To avoid this problem, Enclose the expression in Single or Double quotes and it should work correctly.
bin/hadoop dfs -ls 'in*'

3.8. Can I have multiple files in HDFS use different block sizes?

Yes. HDFS provides api to specify block size when you create a file.
See FileSystem.create(Path, overwrite, bufferSize, replication, blockSize, progress)

3.9. Does HDFS make block boundaries between records?

No, HDFS does not provide record-oriented API and therefore is not aware of records and boundaries between them.

3.10. What happens when two clients try to write into the same HDFS file?

HDFS supports exclusive writes only.
When the first client contacts the name-node to open the file for writing, the name-node grants a lease to the client to create this file. When the second client tries to open the same file for writing, the name-node will see that the lease for the file is already granted to another client, and will reject the open request for the second client.

3.11. How to limit Data node's disk usage?

Use dfs.datanode.du.reserved configuration value in $HADOOP_HOME/conf/hdfs-site.xml for limiting disk usage.
<property> <name>dfs.datanode.du.reserved</name> <!-- cluster variant --> <value>182400</value> <description>Reserved space in bytes per volume. Always leave this much space free for non dfs use. </description> </property>

3.12. On an individual data node, how do you balance the blocks on the disk?

Hadoop currently does not have a method by which to do this automatically. To do this manually:
Take down the HDFS
Use the UNIX mv command to move the individual blocks and meta pairs from one directory to another on each host
Restart the HDFS

3.13. What does "file could only be replicated to 0 nodes, instead of 1" mean?

The NameNode does not have any available DataNodes. This can be caused by a wide variety of reasons. Check the DataNode logs, the NameNode logs, network connectivity, ... Please see the page: CouldOnlyBeReplicatedTo

3.14. If the NameNode loses its only copy of the fsimage file, can the file system be recovered from the DataNodes?

No. This is why it is very important to configure dfs.namenode.name.dir to write to two filesystems on different physical hosts, use the SecondaryNameNode, etc.

3.15. I got a warning on the NameNode web UI "WARNING : There are about 32 missing blocks. Please check the log or run fsck." What does it mean?

This means that 32 blocks in your HDFS installation don’t have a single replica on any of the live DataNodes.
Block replica files can be found on a DataNode in storage directories specified by configuration parameter dfs.datanode.data.dir. If the parameter is not set in the DataNode’shdfs-site.xml, then the default location /tmp will be used. This default is intended to be used only for testing. In a production system this is an easy way to lose actual data, as local OS may enforce recycling policies on /tmp. Thus the parameter must be overridden.
If dfs.datanode.data.dir correctly specifies storage directories on all DataNodes, then you might have a real data loss, which can be a result of faulty hardware or software bugs. If the file(s) containing missing blocks represent transient data or can be recovered from an external source, then the easiest way is to remove (and potentially restore) them. Run fsck in order to determine which files have missing blocks. If you would like (highly appreciated) to further investigate the cause of data loss, then you can dig intoNameNode and DataNode logs. From the logs one can track the entire life cycle of a particular block and its replicas.

3.16. If a block size of 64MB is used and a file is written that uses less than 64MB, will 64MB of disk space be consumed?

Short answer: No.
Longer answer: Since HFDS does not do raw disk block storage, there are two block sizes in use when writing a file in HDFS: the HDFS blocks size and the underlying file system's block size. HDFS will create files up to the size of the HDFS block size as well as a meta file that contains CRC32 checksums for that block. The underlying file system store that file as increments of its block size on the actual raw disk, just as it would any other file.

4. Platform Specific

4.1. General

4.1.1. Problems building the C/C++ Code

While most of Hadoop is built using Java, a larger and growing portion is being rewritten in C and C++. As a result, the code portability between platforms is going down. Part of the problem is the lack of access to platforms other than Linux and our tendency to use specific BSD, GNU, or System V functionality in places where the POSIX-usage is non-existent, difficult, or non-performant.
That said, the biggest loss of native compiled code will be mostly performance of the system and the security features present in newer releases of Hadoop. The other Hadoop features usually have Java analogs that work albeit slower than their C cousins. The exception to this is security, which absolutely requires compiled code.

4.2. Mac OS X

4.2.1. Building on Mac OS X 10.6

Be aware that Apache Hadoop 0.22 and earlier require Apache Forrest to build the documentation. As of Snow Leopard, Apple no longer ships Java 1.5 which Apache Forrest requires. This can be accomplished by either copying /System/Library/Frameworks/JavaVM.Framework/Versions/1.5 and 1.5.0 from a 10.5 machine or using a utility like Pacifist to install from an official Apple package. http://chxor.chxo.com/post/183013153/installing-java-1-5-on-snow-leopard provides some step-by-step directions.

4.3. Solaris

4.3.1. Why do files and directories show up as DrWho and/or user names are missing/weird?

Prior to 0.22, Hadoop uses the 'whoami' and id commands to determine the user and groups of the running process. whoami ships as part of the BSD compatibility package and is normally not in the path. The id command's output is System V-style whereas Hadoop expects POSIX. Two changes to the environment are required to fix this:
Make sure /usr/ucb/whoami is installed and in the path, either by including /usr/ucb at the tail end of the PATH environment or symlinking /usr/ucb/whoami directly.
In hadoop-env.sh, change the HADOOP_IDENT_STRING thusly:
export HADOOP_IDENT_STRING=`/usr/xpg4/bin/id -u -n`

4.3.2. Reported disk capacities are wrong

Hadoop uses du and df to determine disk space used. On pooled storage systems that report total capacity of the entire pool (such as ZFS) rather than the filesystem, Hadoop gets easily confused. Users have reported that using fixed quota sizes for HDFS and MapReduce directories helps eliminate a lot of this confusion.

4.4. Windows

4.4.1. Building / Testing Hadoop on Windows

The Hadoop build on Windows can be run from inside a Windows (not cygwin) command prompt window.
Whether you set environment variables in a batch file or in System->Properties->Advanced->Environment Variables, the following environment variables need to be set:
set ANT_HOME=c:\apache-ant-1.7.1 set JAVA_HOME=c:\jdk1.6.0.4 set PATH=%PATH%;%ANT_HOME%\bin

then open a command prompt window, cd to your workspace directory (in my case it is c:\workspace\hadoop) and run ant. Since I am interested in running the contrib test cases I do the following:
ant -l build.log -Dtest.output=yes test-contrib

other targets work similarly. I just wanted to document this because I spent some time trying to figure out why the ant build would not run from a cygwin command prompt window. If you are building/testing on Windows, and haven't figured it out yet, this should get you started.

Hadoop interview questions

Name the most common InputFormats defined in Hadoop? Which one is default ?
Following 3 are most common InputFormats defined in Hadoop
  •  TextInputFormat
  •  KeyValueInputFormat
  • SequenceFileInputFormat
TextInputFormat is the hadoop default.

What is the difference between TextInputFormat and KeyValueInputFormat class?

TextInputFormat: 

It reads lines of text files and provides the offset of the line as key to the Mapper and actual line as Value to the mapper

KeyValueInputFormat: 

Reads text file and parses lines into key, val pairs. Everything up to the first tab character is sent as key to the Mapper and the remainder of the line is sent as value to the mapper.

What is InputSplit in Hadoop?

When a hadoop job is run, it splits input files into chunks and assign each split to a mapper to process. This is called Input Split

How is the splitting of file invoked in Hadoop Framework ?

It is invoked by the Hadoop framework by running getInputSplit() method of the Input format class (like FileInputFormat) defined by the user

Consider case scenario:
 In M/R system,
  • HDFS block size is 64 MB
  •  Input format is FileInputFormat
  •  We have 3 files of size 64K, 65Mb and 127Mb 

then how many input splits will be made by Hadoop framework?

Hadoop will make 5 splits as follows
  • 1 split for 64K files 
  • 2 splits for 65Mb files 
  • 2 splits for 127Mb file 

What is the purpose of RecordReader in Hadoop?

The InputSplit has defined a slice of work, but does not describe how to access it. The RecordReader class actually loads the data from its source and converts it into (key, value) pairs suitable for reading by the Mapper. The RecordReader instance is defined by the InputFormat

After the Map phase finishes, the hadoop framework does "Partitioning, Shuffle and sort". Explain what happens in this phase?

- Partitioning

Partitioning is the process of determining which reducer instance will receive which intermediate keys and values. Each mapper must determine for all of its output (key, value) pairs which reducer will receive them. It is necessary that for any key, regardless of which mapper instance generated it, the destination partition is the same

- Shuffle

After the first map tasks have completed, the nodes may still be performing several more map tasks each. But they also begin exchanging the intermediate outputs from the map tasks to where they are required by the reducers. This process of moving map outputs to the reducers is known as shuffling.

- Sort

Each reduce task is responsible for reducing the values associated with several intermediate keys. The set of intermediate keys on a single node is automatically sorted by Hadoop before they are presented to the Reducer

If no custom partitioner is defined in the hadoop then how is data partitioned before its sent to the reducer?

The default partitioner computes a hash value for the key and assigns the partition based on this result

What is a Combiner?

The Combiner is a "mini-reduce" process which operates only on data generated by a mapper. The Combiner will receive as input all data emitted by the Mapper instances on a given node. The output from the Combiner is then sent to the Reducers, instead of the output from the Mappers.

What is job tracker?

Job Tracker is the service within Hadoop that runs Map Reduce jobs on the cluster

What are some typical functions of Job Tracker?

The following are some typical tasks of Job Tracker
- Accepts jobs from clients
- It talks to the NameNode to determine the location of the data
- It locates TaskTracker nodes with available slots at or near the data
- It submits the work to the chosen Task Tracker nodes and monitors progress of each task by receiving heartbeat signals from Task tracker

What is task tracker?

Task Tracker is a node in the cluster that accepts tasks like Map, Reduce and Shuffle operations - from a JobTracker

Whats the relationship between Jobs and Tasks in Hadoop?

One job is broken down into one or many tasks in Hadoop.

Suppose Hadoop spawned 100 tasks for a job and one of the task failed. What will hadoop do ?
It will restart the task again on some other task tracker and only if the task fails more than 4 (default setting and can be changed) times will it kill the job

Hadoop achieves parallelism by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program and slow down the program. What mechanism Hadoop provides to combat this ?

Speculative Execution

How does speculative execution works in Hadoop ?

Job tracker makes different task trackers process same input. When tasks complete, they announce this fact to the Job Tracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the Task Trackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.

Using command line in Linux, how will you
- see all jobs running in the hadoop cluster
- kill a job
- hadoop job -list
- hadoop job -kill jobid

What is Hadoop Streaming ?

Streaming is a generic API that allows programs written in virtually any language to be used as Hadoop Mapper and Reducer implementations

What is the characteristic of streaming API that makes it flexible run map reduce jobs in languages like perl, ruby, awk etc. ?

Hadoop Streaming allows to use arbitrary programs for the Mapper and Reducer phases of a Map Reduce job by having both Mappers and Reducers receive their input on stdin and emit output (key, value) pairs on stdout.

Whats is Distributed Cache in Hadoop ?

Distributed Cache is a facility provided by the Map/Reduce framework to cache files (text, archives, jars and so on) needed by applications during execution of the job. The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node.

What is the benifit of Distributed cache, why can we just have the file in HDFS and have the application read it ?

This is because distributed cache is much faster. It copies the file to all trackers at the start of the job. Now if the task tracker runs 10 or 100 mappers or reducer, it will use the same copy of distributed cache. On the other hand, if you put code in file to read it from HDFS in the MR job then every mapper will try to access it from HDFS hence if a task tracker run 100 map jobs then it will try to read this file 100 times from HDFS. Also HDFS is not very efficient when used like this.

What mechanism does Hadoop framework provides to synchronize changes made in Distribution Cache during runtime of the application ?

This is a trick questions. There is no such mechanism. Distributed Cache by design is read only during the time of Job execution

Have you ever used Counters in Hadoop. Give us an example scenario ?

Anybody who claims to have worked on a Hadoop project is expected to use counters

Is it possible to provide multiple input to Hadoop? If yes then how can you give multiple directories as input to the Hadoop job ?
Yes, The input format class provides methods to add multiple directories as input to a Hadoop job

Is it possible to have Hadoop job output in multiple directories. If yes then how ?

Yes, by using Multiple Outputs class

What will a hadoop job do if you try to run it with an output directory that is already present? Will it

- overwrite it
- warn you and continue
- throw an exception and exit
The hadoop job will throw an exception and exit.

How can you set an arbitrary number of mappers to be created for a job in Hadoop ?

This is a trick question. You cannot set it

How can you set an arbitary number of reducers to be created for a job in Hadoop ?
You can either do it progamatically by using method setNumReduceTasksin the JobConfclass or set it up as a configuration setting

How will you write a custom partitioner for a Hadoop job ?

To have hadoop use a custom partitioner you will have to do minimum the following three
- Create a new class that extends Partitioner class
- Override method getPartition
- In the wrapper that runs the Map Reducer, either
- add the custom partitioner to the job programtically using method setPartitionerClass or
- add the custom partitioner to the job as a config file (if your wrapper reads from config file or oozie)

How did you debug your Hadoop code ?

There can be several ways of doing this but most common ways are
- By using counters
- The web interface provided by Hadoop framework

Did you ever built a production process in Hadoop ? If yes then what was the process when your hadoop job fails due to any reason?

Its an open ended question but most candidates, if they have written a production job, should talk about some type of alert mechanisn like email is sent or there monitoring system sends an alert. Since Hadoop works on unstructured data, its very important to have a good alerting system for errors since unexpected data can very easily break the job.

Did you ever ran into a lop sided job that resulted in out of memory error, if yes then how did you handled it ?

This is an open ended question but a candidate who claims to be an intermediate developer and has worked on large data set (10-20GB min) should have run into this problem. There can be many ways to handle this problem but most common way is to alter your algorithm and break down the job into more map reduce phase or use a combiner if possible.

What is HDFS?

HDFS, the Hadoop Distributed File System, is a distributed file system designed to hold very large amounts of data (terabytes or even petabytes), and provide high-throughput access to this information. Files are stored in a redundant fashion across multiple machines to ensure their durability to failure and high availability to very parallel applications

What does the statement "HDFS is block structured file system" means?

It means that in HDFS individual files are broken into blocks of a fixed size. These blocks are stored across a cluster of one or more machines with data storage capacity

What does the term "Replication factor" mean?

Replication factor is the number of times a file needs to be replicated in HDFS

What is the default replication factor in HDFS?

3

What is the default block size of an HDFS block?

64Mb

What is the benefit of having such big block size (when compared to block size of linux file system like ext)?

It allows HDFS to decrease the amount of metadata storage required per file (the list of blocks per file will be smaller as the size of individual blocks increases). Furthermore, it allows for fast streaming reads of data, by keeping large amounts of data sequentially laid out on the disk

Why is it recommended to have few very large files instead of a lot of small files in HDFS?

This is because the Name node contains the meta data of each and every file in HDFS and more files means more metadata and since namenode loads all the metadata in memory for speed hence having a lot of files may make the metadata information big enough to exceed the size of the memory on the Name node

True/false question. What is the lowest granularity at which you can apply replication factor in HDFS
- You can choose replication factor per directory
- You can choose replication factor per file in a directory
- You can choose replication factor per block of a file
- True
- True
- False

What is a datanode in HDFS?

Individual machines in the HDFS cluster that hold blocks of data are called datanodes

What is a Namenode in HDFS?

The Namenode stores all the metadata for the file system

What alternate way does HDFS provides to recover data in case a Namenode, without backup, fails and cannot be recovered?

There is no way. If Namenode dies and there is no backup then there is no way to recover data

Describe how a HDFS client will read a file in HDFS, like will it talk to data node or namenode ... how will data flow etc?

To open a file, a client contacts the Name Node and retrieves a list of locations for the blocks that comprise the file. These locations identify the Data Nodes which hold each block. Clients then read file data directly from the Data Node servers, possibly in parallel. The Name Node is not directly involved in this bulk data transfer, keeping its overhead to a minimum.

Using linux command line. how will you
- List the the number of files in a HDFS directory
- Create a directory in HDFS
- Copy file from your local directory to HDFS
hadoop fs -ls
hadoop fs -mkdir
hadoop fs -put localfile hdfsfile

Advantages of Hadoop?

  • Bringing compute and storage together on commodity hardware: The result is blazing speed at low cost.
  •  Price performance: The Hadoop big data technology provides significant cost savings (think a factor of approximately 10) with significant performance improvements (again, think factor of 10). Your mileage may vary. If the existing technology can be so dramatically trounced, it is worth examining if Hadoop can complement or replace aspects of your current architecture.
  •  Linear Scalability: Every parallel technology makes claims about scale up.Hadoop has genuine scalability since the latest release is expanding the limit on the number of nodes to beyond 4,000.
  •  Full access to unstructured data: A highly scalable data store with a good parallel programming model, MapReduce, has been a challenge for the industry for some time. Hadoop programming model does not solve all problems, but it is a strong solution for many tasks.

Definition of Big data?

According to Gartner, Big data can be defined as high volume, velocity and variety information requiring innovative and cost effective forms of information processing for enhanced decision making.

How Big data differs from database ?

Datasets which are beyond the ability of the database to store, analyze and manage can be defined as Big. The technology extracts required information from large volume whereas the storage area is limited for a database.

Who are all using Hadoop? Give some examples?

  •  A9.com
  • Amazon
  •  Adobe
  • AOL
  •  Baidu
  • Cooliris
  •  Facebook
  •  NSF-Google
  •  IBM
  •  LinkedIn
  •  Ning
  •  PARC
  •  Rackspace
  •  StumbleUpon
  • Twitter
  •  Yahoo!

Pig for Hadoop - Give some points?

Pig is Data-flow oriented language for analyzing large data sets.
It is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.

At the present time, Pig infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs, for which large-scale parallel implementations already exist (e.g., the Hadoop subproject). Pig language layer currently consists of a textual language called Pig Latin, which has the following key properties:

Ease of programming.

It is trivial to achieve parallel execution of simple, "embarrassingly parallel" data analysis tasks. Complex tasks comprised of multiple interrelated data transformations are explicitly encoded as data flow sequences, making them easy to write, understand, and maintain.

Optimization opportunities.

The way in which tasks are encoded permits the system to optimize their execution automatically, allowing the user to focus on semantics rather than efficiency.

Extensibility.

Users can create their own functions to do special-purpose processing.

Features of Pig:

– data transformation functions
– datatypes include sets, associative arrays, tuples
– high-level language for marshalling data
- developed at yahoo!

Hive for Hadoop - Give some points?

Hive is a data warehouse system for Hadoop that facilitates easy data summarization, ad-hoc queries, and the analysis of large datasets stored in Hadoop compatible file systems. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.

Keypoints:

  •  SQL-based data warehousing application
– features similar to Pig
– more strictly SQL-type
  •  Supports SELECT, JOIN, GROUP BY,etc
  • Analyzing very large data sets
– log processing, text mining, document indexing
  •  Developed at Facebook

Map Reduce in Hadoop?

Map reduce :

it is a framework for processing in parallel across huge datasets usning large no. of computers referred to cluster, it involves two processes namely Map and reduce.

Map Process:

In this process input is taken by the master node,which divides it into smaller tasks and distribute them to the workers nodes. The workers nodes process these sub tasks and pass them back to the master node.

Reduce Process :

In this the master node combines all the answers provided by the worker nodes to get the results of the original task. The main advantage of Map reduce is that the map and reduce are performed in distributed mode. Since each operation is independent, so each map can be performed in parallel and hence reducing the net computing time.

What is a heartbeat in HDFS?

A heartbeat is a signal indicating that it is alive. A data node sends heartbeat to Name node and task tracker will send its heart beat to job tracker. If the Name node or job tracker does not receive heart beat then they will decide that there is some problem in data node or task tracker is unable to perform the assigned task.

What is a metadata?

Metadata is the information about the data stored in data nodes such as location of the file, size of the file and so on.

Is Namenode also a commodity?

No. Namenode can never be a commodity hardware because the entire HDFS rely on it.
It is the single point of failure in HDFS. Namenode has to be a high-availability machine.

Can Hadoop be compared to NOSQL database like Cassandra?

Though NOSQL is the closet technology that can be compared to Hadoop, it has its own pros and cons. There is no DFS in NOSQL. Hadoop is not a database. It’s a filesystem (HDFS) and distributed programming framework (MapReduce).

What is Key value pair in HDFS?

Key value pair is the intermediate data generated by maps and sent to reduces for generating the final output.

What is the difference between MapReduce engine and HDFS cluster?

HDFS cluster is the name given to the whole configuration of master and slaves where data is stored. Map Reduce Engine is the programming module which is used to retrieve and analyze data.

What is a rack?

Rack is a storage area with all the datanodes put together. These datanodes can be physically located at different places. Rack is a physical collection of datanodes which are stored at a single location. There can be multiple racks in a single location.

How indexing is done in HDFS?

Hadoop has its own way of indexing. Depending upon the block size, once the data is stored, HDFS will keep on storing the last part of the data which will say where the next part of the data will be. In fact, this is the base of HDFS.

History of Hadoop?

Hadoop was created by Doug Cutting, the creator of Apache Lucene, the widely used text search library. Hadoop has its origins in Apache Nutch, an open source web search engine, itself a part of the Lucene project.

The name Hadoop is not an acronym; it’s a made-up name. The project’s creator, Doug Cutting, explains how the name came about:
The name my kid gave a stuffed yellow elephant. Short, relatively easy to spell and pronounce, meaningless, and not used elsewhere: those are my naming criteria.

Subprojects and “contrib” modules in Hadoop also tend to have names that are unrelated to their function, often with an elephant or other animal theme (“Pig,” for example). Smaller components are given more descriptive (and therefore more mundane) names. This is a good principle, as it means you can generally work out what something does from its name. For example, the jobtracker keeps track of MapReduce jobs.

What is meant by Volunteer Computing?

Volunteer computing projects work by breaking the problem they are trying to solve into chunks called work units, which are sent to computers around the world to be analyzed.
SETI@home is the most well-known of many volunteer computing projects.

How Hadoop differs from SETI (Volunteer computing)?

Although SETI (Search for Extra-Terrestrial Intelligence) may be superficially similar to MapReduce (breaking a problem into independent pieces to be worked on in parallel), there are some significant differences. The SETI@home problem is very CPU-intensive, which makes it suitable for running on hundreds of thousands of computers across the world. Since the time to transfer the work unit is dwarfed by the time to run the computation on it. Volunteers are donating CPU cycles, not bandwidth.

MapReduce is designed to run jobs that last minutes or hours on trusted, dedicated hardware running in a single data center with very high aggregate bandwidth interconnects. By contrast, SETI@home runs a perpetual computation on untrusted machines on the Internet with highly variable connection speeds and no data locality.

Compare RDBMS and MapReduce?

Data size:

RDBMS - Gigabytes
MapReduce - Petabytes

Access:

RDBMS - Interactive and batch
MapReduce - Batch

Updates:

RDBMS - Read and write many times
MapReduce - Write once, read many times

Structure:

RDBMS - Static schema
MapReduce - Dynamic schema

Integrity:

RDBMS - High
MapReduce - Low

Scaling:

RDBMS - Nonlinear
MapReduce - Linear

What is HBase?

A distributed, column-oriented database. HBase uses HDFS for its underlying storage, and supports both batch-style computations using MapReduce and point queries (random reads).

What is ZooKeeper?

A distributed, highly available coordination service. ZooKeeper provides primitives such as distributed locks that can be used for building distributed applications.

What is Chukwa?

A distributed data collection and analysis system. Chukwa runs collectors that store data in HDFS, and it uses MapReduce to produce reports. (At the time of this writing, Chukwa had only recently graduated from a “contrib” module in Core to its own subproject.)

What is Avro?

A data serialization system for efficient, cross-language RPC, and persistent data storage. (At the time of this writing, Avro had been created only as a new subproject, and no other Hadoop subprojects were using it yet.)

core subproject in Hadoop - What is it?

A set of components and interfaces for distributed filesystems and general I/O (serialization, Java RPC, persistent data structures).

What are all Hadoop subprojects?

Pig, Chukwa, Hive, HBase, MapReduce, HDFS, ZooKeeper, Core, Avro

What is a split?

Hadoop divides the input to a MapReduce job into fixed-size pieces called input splits, or just splits. Hadoop creates one map task for each split, which runs the userdefined map function for each record in the split.

Having many splits means the time taken to process each split is small compared to the time to process the whole input. So if we are processing the splits in parallel, the processing is better load-balanced.

On the other hand, if splits are too small, then the overhead of managing the splits and of map task creation begins to dominate the total job execution time. For most jobs, a good split size tends to be the size of a HDFS block, 64 MB by default, although this can be changed for the cluster

Map tasks write their output to local disk, not to HDFS. Why is this?

Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete the map output can be thrown away. So storing it in HDFS, with replication, would be overkill. If the node running the map task fails before the map output has been consumed by the reduce task, then Hadoop will automatically rerun the map task on another node to recreate the map output.

MapReduce data flow with a single reduce task- Explain?

The input to a single reduce task is normally the output from all mappers.
The sorted map outputs have to be transferred across the network to the node where the reduce task is running, where they are merged and then passed to the user-defined reduce function. The output of the reduce is normally stored in HDFS for reliability.
For each HDFS block of the reduce output, the first replica is stored on the local node, with other replicas being stored on off-rack nodes.

MapReduce data flow with multiple reduce tasks- Explain?

When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task. There can be many keys (and their associated values) in each partition, but the records for every key are all in a single partition. The partitioning can be controlled by a user-defined partitioning function, but normally the default partitioner.

MapReduce data flow with no reduce tasks- Explain?

It’s also possible to have zero reduce tasks. This can be appropriate when you don’t need the shuffle since the processing can be carried out entirely in parallel.
In this case, the only off-node data transfer is used when the map tasks write to HDFS

What is a block in HDFS?

Filesystems deal with data in blocks, which are an integral multiple of the disk block size. Filesystem blocks are typically a few kilobytes in size, while disk blocks are normally 512 bytes.

Why is a Block in HDFS So Large?

HDFS blocks are large compared to disk blocks, and the reason is to minimize the cost of seeks. By making a block large enough, the time to transfer the data from the disk can be made to be significantly larger than the time to seek to the start of the block. Thus the time to transfer a large file made of multiple blocks operates at the disk transfer rate.

File permissions in HDFS?

HDFS has a permissions model for files and directories.
There are three types of permission: the read permission (r), the write permission (w) and the execute permission (x). The read permission is required to read files or list the contents of a directory. The write permission is required to write a file, or for a directory, to create or delete files or directories in it. The execute permission is ignored for a file since you can’t execute a file on HDFS.

What is Thrift in HDFS?

The Thrift API in the “thriftfs” contrib module exposes Hadoop filesystems as an Apache Thrift service, making it easy for any language that has Thrift bindings to interact with a Hadoop filesystem, such as HDFS.
To use the Thrift API, run a Java server that exposes the Thrift service, and acts as a proxy to the Hadoop filesystem. Your application accesses the Thrift service, which is typically running on the same machine as your application.

How Hadoop interacts with C?

Hadoop provides a C library called libhdfs that mirrors the Java FileSystem interface.
It works using the Java Native Interface (JNI) to call a Java filesystem client.
The C API is very similar to the Java one, but it typically lags the Java one, so newer features may not be supported. You can find the generated documentation for the C API in the libhdfs/docs/api directory of the Hadoop distribution.

What is FUSE in HDFS Hadoop?

Filesystem in Userspace (FUSE) allows filesystems that are implemented in user space to be integrated as a Unix filesystem. Hadoop’s Fuse-DFS contrib module allows any Hadoop filesystem (but typically HDFS) to be mounted as a standard filesystem. You can then use Unix utilities (such as ls and cat) to interact with the filesystem.
Fuse-DFS is implemented in C using libhdfs as the interface to HDFS. Documentation for compiling and running Fuse-DFS is located in the src/contrib/fuse-dfs directory of the Hadoop distribution.

Explain WebDAV in Hadoop?

WebDAV is a set of extensions to HTTP to support editing and updating files. WebDAV shares can be mounted as filesystems on most operating systems, so by exposing HDFS (or other Hadoop filesystems) over WebDAV, it’s possible to access HDFS as a standard filesystem.

What is Sqoop in Hadoop?

It is a tool design to transfer the data between Relational database management system(RDBMS) and Hadoop HDFS.
Thus, we can sqoop the data from RDBMS like mySql or Oracle into HDFS of Hadoop as well as exporting data from HDFS file to RDBMS.
Sqoop will read the table row-by-row and the import process is performed in Parallel. Thus, the output may be in multiple files.
Example:
sqoop INTO "directory";
(SELECT * FROM database.table WHERE condition;)

Monday, August 24, 2015

Map Reduce Scheduling information

JobTracker and TaskTracker: the MapReduce engine

Hadoop distributed file systems comes the MapReduce engine, which consists of one JobTracker, to which client applications submit MapReduce jobs. The JobTracker pushes work out to available TaskTracker nodes in the cluster, striving to keep the work as close to the data as possible. With a rack-aware file system, the JobTracker knows which node contains the data, and which other machines are nearby. If the work cannot be hosted on the actual node where the data resides, priority is given to nodes in the same rack. This reduces network traffic on the main backbone network. If a TaskTracker fails or times out, that part of the job is rescheduled. The TaskTracker on each node spawns off a separate Java Virtual Machine process to prevent the TaskTracker itself from failing if the running job crashes the JVM. A heartbeat is sent from the TaskTracker to the JobTracker every few minutes to check its status. The Job Tracker and TaskTracker status and information is exposed by Jetty and can be viewed from a web browser.

If the JobTracker failed on Hadoop 0.20 or earlier, all ongoing work was lost. Hadoop version 0.21 added some checkpointing to this process; the JobTracker records what it is up to in the file system. When a JobTracker starts up, it looks for any such data, so that it can restart work from where it left off.

Known limitations of this approach are:

  • The allocation of work to TaskTrackers is very simple. Every TaskTracker has a number of available slots (such as "4 slots"). Every active map or reduce task takes up one slot. The Job Tracker allocates work to the tracker nearest to the data with an available slot. There is no consideration of the current system load of the allocated machine, and hence its actual availability.
  • If one TaskTracker is very slow, it can delay the entire MapReduce job – especially towards the end of a job, where everything can end up waiting for the slowest task. With speculative execution enabled, however, a single task can be executed on multiple slave nodes.

Scheduling:

By default Hadoop uses FIFO, and optional 5 scheduling priorities to schedule jobs from a work queue. In version 0.19 the job scheduler was refactored out of the JobTracker, and added the ability to use an alternate scheduler (such as the Fair scheduler or the Capacity scheduler).

Fair scheduler:

The fair scheduler was developed by Facebook. The goal of the fair scheduler is to provide fast response times for small jobs and QoS for production jobs. The fair scheduler has three basic concepts.
  • Jobs are grouped into Pools.
  • Each pool is assigned a guaranteed minimum share.
  • Excess capacity is split between jobs.
By default, jobs that are uncategorized go into a default pool. Pools have to specify the minimum number of map slots, reduce slots, and a limit on the number of running jobs.

Capacity scheduler

The capacity scheduler was developed by Yahoo. The capacity scheduler supports several features that are similar to the fair scheduler.
  • Jobs are submitted into queues.
  • Queues are allocated a fraction of the total resource capacity.
  • Free resources are allocated to queues beyond their total capacity.
  • Within a queue a job with a high level of priority has access to the queue's resources.
There is no preemption once a job is running.

Other applications

The HDFS file system is not restricted to MapReduce jobs. It can be used for other applications, many of which are under development at Apache. The list includes theHBase database, the Apache Mahout machine learning system, and the Apache Hive Data Warehouse system. Hadoop can in theory be used for any sort of work that is batch-oriented rather than real-time, that is very data-intensive, and able to work on pieces of the data in parallel. As of October 2009, commercial applications of Hadoop included:
  • Log and/or clickstream analysis of various kinds
  • Marketing analytics
  • Machine learning and/or sophisticated data mining
  • Image processing
  • Processing of XML messages
  • Web crawling and/or text processing
  • General archiving, including of relational/tabular data, e.g. for compliance

Cloudera Certification Practice Test

Cloudera Certification Practice Test Exam with answers

1. You have decided to develop a custom composite key class that you will use for keys emitted during the map phase to be consumed by a reducer. Which interface must this key implement?
Correct Answer: WritableComparable

Explanation

All keys and values must implement the Writable interface so that they can be wriiten to disk. In addition, keys emitted during the map phase must implement WritableComparable so that the keys can be sorted during the shuffle and sort phase. WritableComparable inherits from Writable.

Further Reading

  • For more information, see the Yahoo! Developer Network Apach Hadoop Tutorial, Custom Key Types.
  • Hadoop: the Definitive Guide, chapter four, in the Serialization: The Writable Interface section.

2. You’ve built a MapReduce job that denormalizes a very large table, resulting in an extremely large amount of output data. Which two cluster resources will your job stress? (Choose two).
Correct Answer: network I/O , disk I/O

Explanation

When denormalizing a table, the amount of data written by the map phase will far exceed the amount of data read by the map phase. All of the data written during the map phase is first written to local disk and then transferred over the network to the reducers during the beginning of the reduce phase. Writing a very large amount of data in the map phase will therefore create a large amount of local disk I/O on the machines running map tasks and network I/O. Because map output is stored in a fixed size buffer that is written periodically to disk, this operation will not tax the memory of the machines running the map tasks. Denormalizing a table is not a compute-intesive operation, so this operation will not tax the processors of the machines running the map tasks.

Further Reading

  • Hadoop: the Definitive Guide, chapter six, Shuffle and Sort: The Map Side section includes more information on the process for writing map output to disk.
  • Hadoop: the Definitive Guide, chapter six, Shuffle and Sort: The Reduce Side section explains further how data is transferred to the reducers
  • Denormalizing a table is a form of join operation. You can read more about performing joins in MapReduce in Join Algorithms using Map/Reduce

3. When is the earliest that the reduce() method of any reduce task in a given job called?
Correct Answer: Not until all map tasks have completed

Explanation

No reduce task&rquo;s reduce() method is called until all map tasks have completed. Every reduce task&rquo;s reduce() method expects to receive its data in sorted order. If the reduce() method is called before all of the map tasks have completed, it would be possible that the reduce() method would receive the data out of order.

Further Reading

  • Hadoop: The Definitive Guide, chapter six includes a detailed explanation of the shuffle and sort phase of a MapReduce job.

4. You have 50 files in the directory /user/foo/example. Each file is 300MB. You submit a MapReduce job with /user/foo/example as the input path.

How much data does a single Mapper processes as this job executes?

Correct Answer: A single input split

Explanation

An input split is a unit of work (a chunk of data) that is processed by a single map task in a MapReduce program (represented by the Java interface InputSplit). The InputFormat you specify for MapReduce program determines how input files are split into records and read. Each map task processes a single split; each split is further comprised of records (key-value pairs) which the map task processes.

A MapReduce program run over a data set is usually called a MapReduce “job.” By splitting up input files, MapReduce can process a single file in parallel; if the file is very large, this represents a significant performance improvement. Also, because a single file is worked on in splits, it allows MapReduce to schedule those processes on different nodes in the cluster, nodes that have that piece of data already locally stored on that node, which also results in significant performance improvements. An InputSplit can span HDFS block boundaries.

Further Reading

  • Hadoop: The Definitive Guide, chapter two includes an excellent general discussion of input splits
Hadoop Administrator

5. In the execution of a MapReduce job, where does the Mapper place the intermediate data of each Map task?

Correct Answer: The Mapper stores the intermediate data on the underlying filesystem of the local disk of the machine which ran the Map task

Explanation

Intermediate data from a Mapper is stored on the local filesystem of the machine on which the Mapper ran. Each Reducer then copies its portion of that intermediate data to its own local disk. When the job terminates, all intermediate data is removed.
  • See Hadoop: The Definitive Guide, 3rd Edition, Chapter 2, under the section "Data Flow"

6. A client wishes to read a file from HDFS. To do that, it needs the block IDs (their names) which make up the file. From where does it obtain those block IDs?
Correct Answer: The NameNode reads the block IDs from memory and returns them to the client.

Explanation

When a client wishes to read a file from HDFS, it contacts the NameNode and requests the names and locations of the blocks which make up the file. For rapid access, the NameNode has the block IDs stored in RAM.

Further Reading

See Hadoop Operations, under the section "The Read Path."

7. Your cluster has slave nodes in three different racks, and you have written a rack topology script identifying each machine as being in rack1, rack2, or rack3. A client machine outside of the cluster writes a small (one-block) file to HDFS. The first replica of the block is written to a node on rack2. How is block placement determined for the other two replicas?
Correct Answer: Either both will be written to nodes on rack1, or both will be written to nodes on rack3.

Explanation

For the default threefold replication, Hadoop’s rack placement policy is to write the first copy of a block on a node in one rack, then the other two copies on two nodes in a different rack. Since the first copy is written to rack2, the other two will either be written to two nodes on rack1, or two nodes on rack3.

Further Reading

  • For more information on rack topology and block placement policy, see Hadoop Operations: A Guide for Developers and Administrators page 130.
  • See the Apache Docs on Data Replication

8. A slave node in your cluster has 24GB of RAM, and 12 physical processor cores on hyperthreading-enabled processors. You set the value of mapred.child.java.opts to -Xmx1G, and the value of mapred.tasktracker.map.tasks.maximum to 12. What is the most appropriate value to set for mapred.tasktracker.reduce.tasks.maximum?
Correct Answer: 6

Explanation

For optimal performance, it is important to avoid the use of virtual memory (swapping) on slave nodes. From the configuration shown, the slave node could run 12 Map tasks simultaneously, and each one will use 1GB of RAM, resulting in a total of 12GB used. The TaskTracker daemon itself will use 1GB of RAM, as will the DataNode daemon. This is a total of 14GB. The operating system will also use some RAM -- a reasonable estimate would be 1-2GB. Thus, we can expect to have approximately 8-9GB of RAM available for Reducers. So the most appropriate of the choices presented is that we should configure the node to be able to run 6 Reducers simultaneously.

Further Reading

Hadoop: The Definitive Guide, 3rd Edition, Chapter 9, under the section “Environment Settings”
HBase

9. Your client application needs to scan a region for the row key value 104. Given a store file that contains the following list of Row Key values:

100, 101, 102, 103, 104, 105, 106, 107 What would a bloom filter return?
Correct Answer: Confirmation that 104 may be contained in the set

Explanation

A Bloom filter is a kind of membership test using probability -- it tells you whether an element is a member of a set. It is quick and memory-efficient. The trade-off is that it is probabilistic where false positives are possible though false negatives are not; thus if your Bloom Filter returns true, it confirms that a key may be contained in a table. If Bloom Filter returns false, it confirms that a key is definitely not contained in a table.

Enabling Bloom Filters may save your disk seek and improve read latency.

Further Reading

  • Wikipedia on Bloom Filters
  • HBase Documentation on Bloom Filters section 12.6.4. Bloom Filters includes:
Bloom Filters can be enabled per ColumnFamily. Use HColumnDescriptor.setBloomFilterType(NONE | ROW | ROWCOL) to enable blooms per ColumnFamily. Default = NONE for no bloom filters. If ROW, the hash of the row will be added to the bloom on each insert. If ROWCOL, the hash of the row + column family + column family qualifier will be added to the bloom on each key insert.

10. You have two tables in an existing RDBMS. One contains information about the products you sell (name, size, color, etc.) The other contains images of the products in JPEG format. These tables are frequently joined in queries to your database. You would like to move this data into HBase. What is the most efficient schema design for this scenario?
Correct Answer: Create a single table with two column families

Explanation

Access patterns are an important factor in HBase schema design. Even though the two tables in this scenario have very different data sizes and formats, it is better to store them in one table if you are accessing them together most of the time.

Column families allow for separation of data. You can store different types of data and format into different column families. Attributes such as compression, Bloom filters, and replication are set on per column family basis. In this example, it is better to store product information and product images into two different column families and one table.

Further Reading

HBase Documentation on Column Family Section 5.6.especially the part:

Physically, all column family members are stored together on the filesystem. Because tunings and storage specifications are done at the column family level, it is advised that all column family members have the same general access pattern and size characteristics.

11. You need to create a Blogs table in HBase. The table will consist of a single Column Family calledContent and two column qualifiers, Author and Comment. What HBase shell command should you use to create this table?
Correct Answer: create 'Blogs', 'Content'

Explanation

When you create a HBase table, you need to specify table name and column family name.
In the Hbase Shell, you can create a table with the command:
create 'myTable', 'myColumnFamily'

For this example:
  • Table name: 'Blogs'
  • ColumnFamily: 'Content'
We can create the table and verify it with describe table command.
hbase> create 'Blogs', 'Content' hbase> describe 'Blogs' {Name => 'Blogs', FAMILIES => [{NAME => 'Content', ....

Further Reading

see the HBase Shell commands for create

Create table; pass table name, a dictionary of specifications per column family, and optionally a dictionary of table configuration. Dictionaries are described below in the GENERAL NOTES section.
Examples:
hbase> create 't1', {NAME => 'f1', VERSIONS => 5} hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'} hbase> # The above in shorthand would be the following: hbase> create 't1', 'f1', 'f2', 'f3' hbase> create 't1', {NAME => 'f1', VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true}

12. From within an HBase application you are performing a check and put operation with the following code:table.checkAndPut(Bytes.toBytes("rowkey"),Bytes.toBytes("colfam"),Bytes.toBytes("qualifier"), Bytes.toBytes("barvalue"), newrow));

Which describes this check and put operation?

Correct Answer: Check if rowkey/colfam/qualifier exists and has the cell value "barvalue". If so, put the values in newrow and return "true".

Explanation

The method checkAndPut returns "true" if a row with specific column family, and column qualifier value matches the expected value; if "true" is returned, it executes the put with new value. If the specific value is not present in a row, it returns "false" and the put is not executed.

Further Reading

HBase Documentation on checkAndPut:

Data Science Essentials (DS-200)

13. What is the best way to evaluate the quality of the model found by an unsupervised algorithm likek-means clustering, given metrics for the cost of the clustering (how well it fits the data) and its stability (how similar the clusters are across multiple runs over the same data)?
Correct Answer: The lowest cost clustering subject to a stability constraint

Explanation

There is a tradeoff between cost and stability in unsupervised learning. The more tightly you fit the data, the less stable the model will be, and vice versa. The idea is to find a good balance with more weight given to the cost. Typically a good approach is to set a stability threshold and select the model that achieves the lowest cost above the stability threshold.
  • Yale Algorithms in Data Mining - Lecture 10: k-means clustering
  • Evaluation of Stability of k-means Cluster Ensembles with Respect to Random Initialization

14. A sandwich shop studies the number of men, and women, that enter the shop during the lunch hour from noon to 1pm each day. They find that the number of men that enter can be modeled as a random variable with distribution Poisson(M), and likewise the number of women that enter asPoisson(W). What is likely to be the best model of the total number of customers that enter during the lunch hour?
Correct Answer: Poisson(M+W)

Explanation

The total number of customers is the sum of the number of men and women. The sum of two Poisson random variables also follows a Poisson distribution with rate equal to the sum of their rates. The Normal and Binomial distribution can approximate the Poisson distribution in certain cases, but the expressions above do not approximate Poisson(M+W).

15. Which condition supports the choice of a support vector machine over logistic regression for a classification problem?
Correct Answer: The test set will be dense, and contain examples close to decision boundary learned from the training set

Explanation

The SVM algorithm is a maximum margin classifier, and tries to pick a decision boundary that creates the widest margin between classes, rather than just any boundary that separates the classes. This helps generalization to test data, since it is less likely to misclassify points near the decision boundary, as the boundary maintains a large margin from training examples.

SVMs are not particularly better at multi-label clasification. Linear separability is not required for either classification technique, and does not relate directly to an advantage of SVMs. SVMs are not particularly more suited to low dimensional data.