Category: Big Data

Spark SQL

Spark SQL

Spark SQL is the Spark component for structured data processing using a programming abstraction called DataFrame that can act as Distributed SQL query engine.

DataFrame

DataFrame is a distributed collection of data organized into named columns, and it is equivalent to a relational table. They can be built from different sources:
Structured textual data files (csv files, json files)
Existing RDDs
Hive Tables
External Relation Databases


Creating a DataFrame from json files

Spark SQL provides an API that allows creating a DataFrame directly from a textual file where each line contains a json object (hence, the input is not properly a standard json file).

See the example to understand it better.

Creating a DataFrame from existing RDD

Spark SQL provides an API that allows creating a DataFrame from an existing RDD.

DataFrame Operations

There are a set of operations that can be done using the DataFrames:
1. show(), used to show elements of a DataFrame
2. printSchema(), used to show the schema of the DataFrame
3. count(), used to show the number of elements of a DataFrame
4. distinct(), return a DataFrame without duplicates
5. select(), used to select specific elements frome the DataFrame
6. filter(), used to select elements with a specific constraint

Hadoop: internals

Hadoop: internals

 

The explanation of the Hadoop internals goes with the anatomy of a MapReduce Job, with the scheduling algorithms, and with the shuffle and sort phases. We will see also the failures looking at the exceptions of our programs.

ANATOMY OF A MAPREDUCE JOB RUN

Starting from the client node, using the client JVM a new job is submitted, and a new instance of the JobClient is created. Then after some internal initial check, the instance of a new JobTracker is created too, and the job itself is initialized. The JobTracker performs the scheduling by maintaing a queue and encapsulating its tasks.

The JobTracker then retrieves the input splits from the shared FileSystem and wait for the TaskTracker request to send back the task to perform. Then it launch a child JVM.

SCHEDULING

Heartbeat-based mechanism: the TaskTracker periodically sends heartbeats to the JobTracker.

The JobTracker operates with a FIFO Scheduler and using this algorithm it chooses the tasks to sent, but it gives priority to map tasks.

FAILURES

Task failure can happens either for:

  1. a runtime exception during the execution of the map and reduce task. In this case, the child JVM send back a report to the TaskTracker, that will store the report in the log file, and mark the TaskAttemp as failed.
  2. a hanging tasks. In practise the TaskTracker notices no progress update.
  3. a kills by the TaskTracker, which end the life of a child JVM.

In one of this cases the JobTracker is notified of a failed task. If a task fails more times and exceed the maximum, it will not be re-scheduled anymore. If each task fails and the job itself fail.

TaskTracker failure can happens either for a crash or for a running very slowly. In this cases the TaskTracker does not send back to the JobTracker the heartbeat, the JobTracker waits for a timeout (10 minutes), then it removes the TaskTracker from its scheduling pool.

INTERNALS

The framework guarantees the input to every reducer to be sorted by Key. The process involved in which the content is transferred to the reduced and sorted is know as Shuffle. But let us understand the process better.

After the map() method is invoked, the output is not simply written to local disk, the circular memory buffer (whose size is of 100Mb by default) comes in, which is based on a mechanism which spill the content to disk, and the spilling phase or copy phase require the creation of several partitions, one for each reducer. So the output of the map() method enter in the buffer waiting for the spilling phase and then in this phase copied to disk. Notice that during the creation of partions data are sorted.


In the reducer side the map output is copied to the TaskTracker if the content fits. Then a background thread is used to merge all partial inputs into larger and sorted files. This is called the sort phase. At the end of this phase the data are sent to the reduce() method.

Hadoop: multiple inputs

Hadoop: multiple inputs

In some application data are read from two or even more different datasets, and this dataset can have different formats, so Hadoop gives support for this situations: it allows reading from different sources. We need to define one different mapper for each dataset. The rule to follow is that the key-value pairs emitted by the mappers must be consistent. So in the next steps they can be processed as usual.

 

MapReduce Programming Paradigm

MapReduce Programming Paradigm

The MapReduce paradigm is based on the evidence that very large datasets cannot be compute with normal strategies, so it has been choosen to build parallel distributed programming to map, filter, sort, combine and even reduce them for the production of small dataset.

This process depends on our interest, and from the dataset, so every choice is made by the circumstances. Then, for the sake of study it is important to see what are the several ways a MapReduce program can change, differently for specific uses.

Combiner

The input dataset can be split in several portions (e.g. one split makes two Input Splits), then the same number of Mappers is instanced. The KVPs emmited by the Mappers are analyzed in main-memory and aggregaeted by the Combiners. All of this, it is done to reduce the amount of network traffic.

Personalized Data Types

They are used when the key-value pair is a complex data type or they can be used also to manage complex types. Respectively to implement them we need to use two different interfaces.

Sharing Parameters

In some circumstances is important to share some information between mappers, driver, and the reducers, so the configuration object can be used for this purpose, since it store lists of properties.

Counters

Hadoop prives a set of built-in counters used to store some statistics about jobs, reducers and the other parts.

Map-Only Job

In some application all the work can be performed by the mappers.

In-Mapper Combiner

In this situation we are goint to exploit the two methods that we find inside the Mapper: the setup and cleanup method. The setup is called once for each mapper prior to the many calls of the map method, it can be used to set values of in-mapper variables. The clenup method is also called once for each mapper after the many calls of the map method.

Hadoop: implementation of MapReduce

Hadoop: implementation of MapReduce

 

The hadoop framework does amazing things and normally no one know that because it does the whole jobs in silence, behind the programmer point of view, giving to it the effort to write only a couple of selected part of the program: specifically the mapper and the reducer.

Going further, is it right that many programmers out there do not know the complexity behind Hadoop? Yes, because they can focus only on the development of specific functions making the best result with the less effort! So, our focus here will be related to discover the main parts, needed just to be sure to understand what is happening behind and to develop incisively pretty straightforward programs.

Strictly speaking, we do not focus on these tasks:
– parallel exec of map and reduce phases
– exec of the shuffle and sort phases
– scheduling of subtasks
– synchronization
because it will be demanded to the framework. We will focus instead on the development of these ones:
– driver
– mapper
– reducer

THE PROCESS

  1. The input key-vaue pairs (KVPs) are read from the HDFS file system, and for each of them the map() method is invoked, producing the intermediate KVPs. They have the form of “<key, [list of values]> pair”, and they are transient: stored on the local file system (locally on each node);
  2. These intermediate KVPs are then merged together by means of shuffle and sort phases;
  3. At the end the reduce() method is invoked. The reducer use the intermediate KVPs to create the final result: the final KVPs that this time are stored on the HDFS.

Let us see an example and discover which parts compose a program:
– driver, where we need to implement main(…) and run(…) methods,
– mapper, where we need to implement the map(…) method,
– reducer, where we need to implement the reduce(…) method.

Spark MLlib

Spark MLlib

 

Spark MLlib is the spark component that provide machine learning and data mining algorithms for pre-processing techniques, classification, clustering and itemset mining.

DATA TYPES

Spark MLlib is based on a set of basic local and distributed data types like:
1. local vector,
2. labeled point,
3. distributed matrix,
4. Dataframe(s), built on top of these.

MAIN CONCEPTS

This library make us of:
1. DataFrame, used as ML Dataset;
2. Transformer, used to transform a DataFrame to another;
3. Estimator, an alghorithm used to build a model having a DataFrame as input;
4. Pipeline, used to create a chain of Transformers and Estimators to build a workflow;
5. Parameter, used to be specified (Transformer and Estimators shared a common API to specify it).

CLASSIFICATION ALGORITHMS

The Spark MLlib provide only a limited set of classification algorithms:
1. Logistic regression
2. Decision trees
3. SVMs
4. Naive Bayes

CLUSTERING ALGORITHMS

The Spark MLlib provides only a limited set of clustering algorithms:
1. K-means
2. Gaussian mixture

REGRESSION ALGORITHMS

Spark MLlib provides also a set of regression algorithms.

ITEMSET and ASSOCIATION RULE MINING

The Spark MLlib provide the followin association rule mining algorithm:
1. FP-Growth

Spark: key-value pairs

Spark: key-value pairs

Spark supports RDDs of Key-Value pairs, they can be created appying the transformations, and then they are characterized by specific operations.

The list of transformations that can be performed on a Pair RDDs are:
1. ReduceByKey()
2. FoldByKey()
3. CombineBykey()
4. GroupByKey()
5. MapValues()
6. FlatMapValues()
7. Key()
8. Value()
9. SortByKey()

The list of transformations that can be performed on two pair RDDs are:
1. SubtractByKey()
2. Join()
3. CoGroup()

The list of actions that can be performed on a pair RDDs are:
1. CountByKey()
2. CollectAsMap()
3. Lookup()

The list of transformations that can be performed on a double RDDs are:
1. mapToDouble()
2. flatMapToDouble()

The list of actions that can be transformed on a double RDDs are:
1. sum()
2. mean()
3. stdev()
4. variance()
5. max()
6. min()

HOW-TO Run a Spark Application

HOW-TO Run a Spark Application

 

Spark programs are executed by usage of the spark-submit command!

This post is part of the Big Data Trail, so if you want to have the more general view of the whole topic, please follow the hypertext.

Before reading this, also, you need to be conscious of:
1. Big Data
2. Overview of Spark

And after reading this, you will learn:
1. the spark-submit command and its options
2. how to run a spark app on a local machine

Some of the commonly used options are:
–class: the entry point for your application
–master: the master URL for the cluster
–deploy-mode: whether to deploy your driver on the worker nodes or locally as an external client
–conf: arbitrary Spark configuration property in key=value format
But also others releated to the application:
application-jar: path to a jar including your application with dependencies
application-arguments: arguments passed to the main method of main class
Others releated to the executors:
–num-executors, to specify the number
–executor-cores, to specify the number of cores for each executor
–executor-memory, to specify the main memory per each executor
Even, others releated to the driver:
–driver-cores, to specify the number of cored to be used by the driver
–driver-memory, to specify the main memory for the driver

Other relevant resources are available at:

  • https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_develop_run.html
  • http://spark.apache.org/docs/latest/submitting-applications.html