Actor Model with Message Queue

Running Spark Cluster Mode on Windows Prompt

Starting a Master (in terminal 1)

  1. Navigate to c:\spark\bin directory
  2. Start the standalone master by running the command as below (Keep in mind the master url e.g. spark://IP:PORT)
spark-class org.apache.spark.deploy.master.Master
  1. Check the web UI of spark standalong cluster at http://localhost:8080/

Starting a Worker (in terminal 2)

  1. Navigate to c:\spark\bin directory
  2. Start a slave by running the command as below (Use the master url e.g. spark://192.168.0.5:7077)
spark-class org.apache.spark.deploy.worker.Worker spark://192.168.0.5:7077
#spark-class org.apache.spark.deploy.worker.Worker --cores 2 --memory 4g spark://192.168.0.5:7077

Running Shell on Cluster Mode

Run the command as below in the terminal

pyspark --master spark://192.168.0.5:7077

Submitting a Job on Cluster Mode

  1. Navigate to the python file in the terminal
cd c:/users/yohan/documents/spark
  1. Submit the job specifying the master url
spark-submit --master spark://192.168.0.5:7077 --executor-memory test.py
  1. Kill the client as below
spark-class org.apache.spark.deploy.Client kill

See more info:

Query Performance SQL vs Spark

  1. Install AdventureWorks2014 database in local sql server
  2. Create the slow running script to compare query performance
  3. Come up with how to measure the execution time for querying
  4. Query the script in SSMS & Export the reulst as the file .csv
SET STATISTICS TIME ON

SELECT	TOP 10000000 
		th.* 
FROM	Production.TransactionHistory th
JOIN	Production.TransactionHistoryArchive tha ON th.Quantity = tha.Quantity

SET STATISTICS TIME OFF
(10000000 row(s) affected)

 SQL Server Execution Times:
   *CPU time = 3813 ms,  elapsed time = 99440 ms.*
  1. Query the script in pyspark (standalone) by submitting a job & Export the result as the file .csv
from pyspark import SparkConf, SparkContext, SQLContext
import time

conf = SparkConf().setMaster("local").setAppName("TestingSqlQuery")
sc = SparkContext(conf = conf)

query = """(   
SELECT	TOP 10000000 
		th.* 
FROM	Production.TransactionHistory th
JOIN	Production.TransactionHistoryArchive tha ON th.Quantity = tha.Quantity
) as Alias"""

sqlContext = SQLContext(sc)

start = time.time()
df = sqlContext.read.format("jdbc").options(
  url="jdbc:sqlserver://localhost;databasename=AdventureWorks2014;integratedSecurity=true;", 
  driver="com.microsoft.sqlserver.jdbc.SQLServerDriver",  
  dbtable=query,
  numPartitions=4).load()

df.write.csv('testing-sql-query-result.csv')

end = time.time()
print str(end-start) + ' seconds'
PS C:\users\yohan\documents\spark> spark-submit testing-sql-query.py
17/05/22 22:06:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
*59.2020001411 seconds*

Installing Apache Spark on Windows

  1. Install a JDK (Java Development Kit) Keep track of where you installed the JDK; you’ll need that later.
  2. Download a pre-built version of Apache Spark
  3. If necessary, download and install WinRAR so you can extract the .tgz file you downloaded. http://www.rarlab.com/download.htm
  4. Extract the Spark archive, and copy its contents into C:\spark after creating that directory. You should end up with directories like c:\spark\bin, c:\spark\conf, etc.
  5. Download winutils.exe and move it into a C:\winutils\bin folder that you’ve created. (note, this is a 64-bit application. If you are on a 32-bit version of Windows, you’ll need to search for a 32-bit build of winutils.exe for Hadoop.)
  6. Open the the c:\spark\conf folder, and make sure “File Name Extensions” is checked in the “view” tab of Windows Explorer. Rename the log4j.properties.template file to log4j.properties. Edit this file (using Wordpad or something similar) and change the error level from INFO to ERROR for log4j.rootCategory
  7. Right-click your Windows menu, select Control Panel, System and Security, and then System. Click on “Advanced System Settings” and then the “Environment Variables” button.
  8. Add the following new USER variables: a. SPARK_HOME c:\spark b. JAVA_HOME (the path you installed the JDK to in step 1, for example C:\Program Files\Java\jdk1.8.0_101) c. HADOOP HOME c:\winutils
  1. Add the following paths to your PATH user variable: %SPARK_HOME%\bin %JAVA_HOME%\bin
  2. Close the environment variable screen and the control panels.
  3. Install the latest Enthought Canopy
  4. Test it out! a. Open up a Windows command prompt in administrator mode. b. Enter cd c:\spark and then dir to get a directory listing. c. Look for a text file we can play with, like README.md or CHANGES.txt d. Enter pyspark e. At this point you should have a »> prompt. If not, double check the steps above. f. Enter rdd = sc.textFile(“README.md”) (or whatever text file you’ve found) g. Enter rdd.count() h. You should get a count of the number of lines in that file! Congratulations, you just ran your first Spark program! i. Hit control-D to exit the spark shell, and close the console window j. You’ve got everything set up! Hooray!

Apache Spark support in Elastic

  1. Download elasticsearch-hadoop adapter to read/write data in elastic
  2. Extract the adaptor under spark directory as c:\spark\elasticsearch-hadoop-5.4.0
  3. Open visual studio code as administrator
  4. Create the script as below to read elastic index e.g. products/kcosmetics
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("elasticsearch-hadoop")
sc = SparkContext(conf = conf)

# read in ES index/type "products/kcosmetics"
es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf={ "es.resource" : "products/kcosmetics" })
print(es_rdd.first())

kcosmetics_availability = es_rdd.map(lambda item: ("key",{
    'id': item[0] , ## _id from products/kcosmetics
    'availability': item[1]['availability']
}))

# write the results to "titanic/value_counts"
kcosmetics_availability.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf={ 
        "es.index.auto.create": "true", # auto creating index as inserted
        "es.mapping.id": "id",          # auto mapping id as index id
        "es.resource" : "products/kcosmetics_stocks" })
  1. Create the c:/spark/external_jars directory to use external jar, and add the aforementioned jar file to the directory
  2. Add the following lines in c:/spark/config/spark-defaults.config to use the jars
spark.driver.extraClassPath     c:/spark/external_jars/*
spark.executor.extraClassPath   c:/spark/external_jars/*
  1. Run spark-submit script.py to read data from elastic index i.e. products/kcosmetics

See more information here