Hi Sudheer,

Hope you are doing well.

These kind of joins are preffered to be operated by spark-sql and hive integration as with RDD it won't take the name of the columns as in RDD columns names are auto generated.

Only simple joins can be achieved by RDD so please follow the below codes.


custs // 

1Vishal
2Vijay
3Vinay

custs_txns  //

10011563.47
10011589.93
10022345.98
20018765.73
20017642.62
40012345.93

CODE:

sqlContext.sql("CREATE TABLE IF NOT EXISTS custs (cust_id INT, name STRING) row format delimited fields terminated by '\t'")

sqlContext.sql("LOAD DATA LOCAL INPATH '/home/edureka/Desktop/custs' INTO table custs");

sqlContext.sql("CREATE TABLE IF NOT EXISTS custs_txns (cust_id INT, store_id String, amount Float) row format delimited fields terminated by '\t'")

sqlContext.sql("LOAD DATA LOCAL INPATH '/home/edureka/Desktop/custs_txns' INTO table custs_txns");

sqlContext.sql("select * from custs").toDF.show()

sqlContext.sql("select * from custs_txns").toDF.show()

sqlContext.sql("SELECT C.cust_id,C.name,T.amount,T.store_id FROM custs C,  custs_txns T WHERE C.cust_id = T.cust_id").toDF.show()

Screenshot:

image

image


For reading a CSV file:

==> Open the terminal and type spark-shell --packages com.databricks:spark-csv_2.10:1.5.0. Now when you get the scala shell fire the below code for reading the file.

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("file:///home/edureka/Desktop/StockPrices.csv")

image

image

image



Please try and let us know if you face any issue.

We are eagerly waiting for your response.