spark-class org.apache.spark.deploy.master.Master
spark-class org.apache.spark.deploy.worker.Worker spark://192.168.0.5:7077
#spark-class org.apache.spark.deploy.worker.Worker --cores 2 --memory 4g spark://192.168.0.5:7077
Run the command as below in the terminal
pyspark --master spark://192.168.0.5:7077
cd c:/users/yohan/documents/spark
spark-submit --master spark://192.168.0.5:7077 --executor-memory test.py
spark-class org.apache.spark.deploy.Client kill
See more info:
SET STATISTICS TIME ON
SELECT TOP 10000000
th.*
FROM Production.TransactionHistory th
JOIN Production.TransactionHistoryArchive tha ON th.Quantity = tha.Quantity
SET STATISTICS TIME OFF
(10000000 row(s) affected)
SQL Server Execution Times:
*CPU time = 3813 ms, elapsed time = 99440 ms.*
from pyspark import SparkConf, SparkContext, SQLContext
import time
conf = SparkConf().setMaster("local").setAppName("TestingSqlQuery")
sc = SparkContext(conf = conf)
query = """(
SELECT TOP 10000000
th.*
FROM Production.TransactionHistory th
JOIN Production.TransactionHistoryArchive tha ON th.Quantity = tha.Quantity
) as Alias"""
sqlContext = SQLContext(sc)
start = time.time()
df = sqlContext.read.format("jdbc").options(
url="jdbc:sqlserver://localhost;databasename=AdventureWorks2014;integratedSecurity=true;",
driver="com.microsoft.sqlserver.jdbc.SQLServerDriver",
dbtable=query,
numPartitions=4).load()
df.write.csv('testing-sql-query-result.csv')
end = time.time()
print str(end-start) + ' seconds'
PS C:\users\yohan\documents\spark> spark-submit testing-sql-query.py
17/05/22 22:06:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
*59.2020001411 seconds*
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("elasticsearch-hadoop")
sc = SparkContext(conf = conf)
# read in ES index/type "products/kcosmetics"
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={ "es.resource" : "products/kcosmetics" })
print(es_rdd.first())
kcosmetics_availability = es_rdd.map(lambda item: ("key",{
'id': item[0] , ## _id from products/kcosmetics
'availability': item[1]['availability']
}))
# write the results to "titanic/value_counts"
kcosmetics_availability.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={
"es.index.auto.create": "true", # auto creating index as inserted
"es.mapping.id": "id", # auto mapping id as index id
"es.resource" : "products/kcosmetics_stocks" })
spark.driver.extraClassPath c:/spark/external_jars/*
spark.executor.extraClassPath c:/spark/external_jars/*
See more information here