Wednesday, 6 January 2016

Interview Questions & Answers on Apache Spark [Part 2]

Q1: Say I have a huge list of numbers in RDD(say myrdd). And I wrote the following code to compute average:
def myAvg(x, y):
 return (x+y)/2.0;
avg = myrdd.reduce(myAvg);
What is wrong with it? And How would you correct it?
Ans: The average function is not commutative and associative;
I would simply sum it and then divide by count.
def sum(x, y):
 return x+y;
total = myrdd.reduce(sum);
avg = total / myrdd.count();
The only problem with the above code is that the total might become very big thus over flow. So, I would rather divide each number by count and then sum in the following way.
cnt = myrdd.count();
def devideByCnd(x):
 return x/cnt;
myrdd1 = myrdd.map(devideByCnd);
avg = myrdd.reduce(sum);

Q2: Say I have a huge list of numbers in a file in HDFS. Each line has one number.And I want to compute the square root of sum of squares of these numbers. How would you do it?
Ans:
# We would first load the file as RDD from HDFS on spark
numsAsText = sc.textFile("hdfs://namenode:9000/user/kayan/mynumbersfile.txt");
# Define the function to compute the squares
def toSqInt(str):
 v = int(str);
 return v*v;
#Run the function on spark rdd as transformation
nums = numsAsText.map(toSqInt);

#Run the summation as reduce action
total = nums.reduce(sum)

#finally compute the square root. For which we need to import math.
import math;
print math.sqrt(total);

Q3: Is the following approach correct? Is the sqrtOfSumOfSq a valid reducer?

numsAsText =sc.textFile("hdfs://namenode:9000/user/kalyan/mynumbersfile.txt");
def toInt(str):
 return int(str);
nums = numsAsText.map(toInt);
def sqrtOfSumOfSq(x, y):
 return math.sqrt(x*x+y*y);
total = nums.reduce(sum)
import math;
print math.sqrt(total);
Ans: Yes. The approach is correct and sqrtOfSumOfSq is a valid reducer.

Q4: Could you compare the pros and cons of the your approach (in Question 2 above) and my approach (in Question 3 above)?
Ans:
You are doing the square and square root as part of reduce action while I am squaring in map() and summing in reduce in my approach.
My approach will be faster because in your case the reducer code is heavy as it is calling math.sqrt() and reducer code is generally executed approximately n-1 times the spark RDD.
The only downside of my approach is that there is a huge chance of integer overflow because I am computing the sum of squares as part of map.

Q5: If you have to compute the total counts of each of the unique words on spark, how would you go about it?
Ans:

#This will load the bigtextfile.txt as RDD in the spark
lines = sc.textFile("hdfs://namenode:9000/user/kalyan/bigtextfile.txt");


#define a function that can break each line into words
def toWords(line):
      return line.split();

# Run the toWords function on each element of RDD on spark as flatMap transformation.
# We are going to flatMap instead of map because our function is returning multiple values.

words = lines.flatMap(toWords);

# Convert each word into (key, value) pair. Her key will be the word itself and value will be 1.
def toTuple(word):
     return (word, 1);

wordsTuple = words.map(toTuple);

# Now we can easily do the reduceByKey() action.
def sum(x, y):
    return x+y;

counts = wordsTuple.reduceByKey(sum)

# Now, print
counts.collect()
Q6: In a very huge text file, you want to just check if a particular keyword exists. How would you do this using Spark?
Ans:
lines = sc.textFile("hdfs://namenode:9000/user/kalyan/bigtextfile.txt");
def isFound(line):
 if line.find(“mykeyword”) > -1:
  return 1;
 return 0;
foundBits = lines.map(isFound);
sum = foundBits.reduce(sum);
if sum > 0:
 print “FOUND”;
else:
 print “NOT FOUND”;

Q7: Can you improve the performance of this code in previous answer?
Ans: Yes. 
The search is not stopping even after the word we are looking for has been found. Our map code would keep executing on all the nodes which is very inefficient.
We could utilize accumulators to report whether the word has been found or not and then stop the job. Something on these line:
import thread, threading
from time import sleep
result = "Not Set"
lock = threading.Lock()
accum = sc.accumulator(0)
def map_func(line):
 #introduce delay to emulate the slowness
 sleep(1); 
 if line.find("Adventures") > -1:
  accum.add(1);
  return 1;
 return 0;
def start_job():
 global result
 try:
  sc.setJobGroup("job_to_cancel", "some description")
  lines = sc.textFile("hdfs://namenode:9000/user/kalyan/wordcount/input/big.txt");
  result = lines.map(map_func);
  result.take(1);
 except Exception as e:
  result = "Cancelled"
 lock.release()
def stop_job():
 while accum.value < 3 :
  sleep(1);
 sc.cancelJobGroup("job_to_cancel")
supress = lock.acquire()
supress = thread.start_new_thread(start_job, tuple())
supress = thread.start_new_thread(stop_job, tuple())
supress = lock.acquire()

Related Posts Plugin for WordPress, Blogger...