This snippet will monitor two directories and join the data from them when there is a new CSV file in any directory.
from pyspark.sql import SQLContext from pyspark.context import SparkContext from pyspark.sql.types import * sc = SparkContext() sqlContext = SQLContext.getOrCreate(sc) pl_schema = StructType([StructField('id', LongType(), True), StructField('gid', LongType(), True), StructField('pid', LongType(), True), StructField('firstlogin', IntegerType(), True) ]) pl_df = sqlContext.readStream.schema(pl_schema).csv('/tmp/pl/') pl_df.createOrReplaceTempView('pl_mapping') user_schema = StructType([StructField('id', LongType(), True), StructField('fullname', StringType(), True) ]) user_df = sqlContext.readStream.schema(user_schema).csv('/tmp/user/') user_df.createOrReplaceTempView('users') result = sqlContext.sql("SELECT u.id, u.fullname FROM users AS u JOIN pl_mapping AS pl ON u.id = pl.gaf_id") query = result.writeStream.outputMode('append').format('csv').option('path', '/tmp/result/').option('checkpointLocation', '/tmp/ckpt/').start() print('Starting') query.awaitTermination(3600)
The join operation is implemented by Spark SQL which is easy to use (for DBA), and also easy to maintain.
Some articles said if the Spark process restart after failed, the ‘checkpoint’ would help it to continue work from last uncompleted position. I tried it in my local computer, and noticed that it do make some duplicated rows after restart. This is a severe problem for production environment so I will check it in next testings.