我有一个函数all_purch_spark(),它为五个不同的表设置一个Spark上下文以及SQL Context。然后,相同的函数针对AWS Redshift数据库成功运行sql查询。效果很好。我将在下面包括整个功能(当然会删除敏感数据)。请原谅它的长度,但是鉴于我所面临的问题,我想显示它的长度。
我的问题是第二个函数repurch_prep()及其如何调用第一个函数all_purch_spark()。我不知道如何避免这样的错误:NameError:未定义名称'sqlContext'
我将在下面显示两个功能和错误。
这是第一个函数all_purch_spark()。我再次将整个功能放在这里以供参考。我知道这很长,但不确定是否可以将其简化为有意义的示例。
def all_purch_spark():
config = {
'redshift_user': 'tester123',
'redshift_pass': '*****************',
'redshift_port': "5999",
'redshift_db': 'my_database',
'redshift_host': 'redshift.my_database.me',
}
from pyspark import SparkContext, SparkConf, SQLContext
jars = [
"/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
]
conf = (
SparkConf()
.setAppName("S3 with Redshift")
.set("spark.driver.extraClassPath", ":".join(jars))
.set("spark.hadoop.fs.s3a.path.style.access", True)
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("com.amazonaws.services.s3.enableV4", True)
.set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
)
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)
##Set Schema and table to query
schema1 = 'production'
schema2 = 'X4production'
table1 = 'purchases'
table2 = 'customers'
table3 = 'memberships'
table4 = 'users' #set as users table in both schemas
purchases_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table1}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
customers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table2}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
memberships_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table3}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
users_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
cusers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema2}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('fc_purchases').getOrCreate()
purchases_df.createOrReplaceTempView('purchases')
customers_df.createOrReplaceTempView('customers')
memberships_df.createOrReplaceTempView('memberships')
users_df.createOrReplaceTempView('users')
cusers_df.createOrReplaceTempView('cusers')
all_purch = spark.sql("SELECT \
p_paid.customer_id AS p_paid_user_id \
,p_trial.created_at AS trial_start_date \
,p_paid.created_at \
,cu.graduation_year \
,lower(cu.student_year) AS student_year \
,lower(p_paid.description) as product \
,u.email \
,u.id AS u_user_id \
,cu.id AS cu_user_id \
FROM \
purchases AS p_paid \
INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
INNER JOIN customers AS c on c.id = p_paid.customer_id \
INNER JOIN memberships AS m on m.id = c.membership_id \
INNER JOIN users AS u on u.id = m.user_id \
INNER JOIN cusers AS cu on cu.id = u.id \
WHERE \
p_trial.created_at >= '2018-03-01' \
AND p_paid.created_at >= '2018-03-01' \
AND u.institution_contract = false \
AND LOWER(u.email) not like '%hotmail.me%' \
AND LOWER(u.email) not like '%gmail.com%' \
AND p_trial.description like '% Day Free Trial' \
AND p_paid.status = 'paid' \
GROUP BY \
p_paid_user_id \
,trial_start_date \
,p_paid.created_at \
,u.email \
,cu.graduation_year \
,student_year \
,product \
,cu_user_id \
,u_user_id \
ORDER BY p_paid_user_id")
all_purch.registerTempTable("all_purch_table")
return all_purch
这是调用上述功能的第二个功能。应该根据上面功能中设置的注册表视图进行选择:
def repurch_prep():
all_purch_spark()
all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
(SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
ORDER BY p_paid_user_id ASC")
return all_repurch
当我运行repurch_prep()时,即使上述函数中定义了SQL上下文,它也会引发以下异常。我试图返回上面的值,但无法弄清楚如何使其工作:
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
in
----> 1 repurch_prep()
~/spark/SparkNotebooks/firecracker/utils_prod_db_spark.py in repurch_prep()
735 #sc = SparkContext().getOrCreate()
736 #sqlContext = SQLContext()
--> 737 all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
738 (SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
739 ORDER BY p_paid_user_id ASC")
NameError: name 'sqlContext' is not defined
任何帮助,不胜感激。
每个@Lamanus的解决方案是将变量放置在函数之外,使它们成为全局变量,而不是将它们存储在一个函数中(就像我所做的那样),然后从另一个函数中调用该函数。
############### SPARK REDSHIFT GLOBAL CONFIG #####################
config = {
'redshift_user': 'tester123',
'redshift_pass': '*****************',
'redshift_port': "5999",
'redshift_db': 'my_database',
'redshift_host': 'redshift.my_database.me',
}
from pyspark import SparkContext, SparkConf, SQLContext
jars = [
"/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
]
conf = (
SparkConf()
.setAppName("S3 with Redshift")
.set("spark.driver.extraClassPath", ":".join(jars))
.set("spark.hadoop.fs.s3a.path.style.access", True)
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("com.amazonaws.services.s3.enableV4", True)
.set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
)
sc = SparkContext(conf=conf).getOrCreate()
###############################################################
def all_purch_spark():
sqlContext = SQLContext(sc)
##Set Schema and table to query
schema1 = 'production'
schema2 = 'X4production'
table1 = 'purchases'
table2 = 'customers'
table3 = 'memberships'
table4 = 'users' #set as users table in both schemas
purchases_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table1}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
customers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table2}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
memberships_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table3}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
users_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
cusers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema2}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('fc_purchases').getOrCreate()
purchases_df.createOrReplaceTempView('purchases')
customers_df.createOrReplaceTempView('customers')
memberships_df.createOrReplaceTempView('memberships')
users_df.createOrReplaceTempView('users')
cusers_df.createOrReplaceTempView('cusers')
all_purch = spark.sql("SELECT \
p_paid.customer_id AS p_paid_user_id \
,p_trial.created_at AS trial_start_date \
,p_paid.created_at \
,cu.graduation_year \
,lower(cu.student_year) AS student_year \
,lower(p_paid.description) as product \
,u.email \
,u.id AS u_user_id \
,cu.id AS cu_user_id \
FROM \
purchases AS p_paid \
INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
INNER JOIN customers AS c on c.id = p_paid.customer_id \
INNER JOIN memberships AS m on m.id = c.membership_id \
INNER JOIN users AS u on u.id = m.user_id \
INNER JOIN cusers AS cu on cu.id = u.id \
WHERE \
p_trial.created_at >= '2018-03-01' \
AND p_paid.created_at >= '2018-03-01' \
AND u.institution_contract = false \
AND LOWER(u.email) not like '%hotmail.me%' \
AND LOWER(u.email) not like '%gmail.com%' \
AND p_trial.description like '% Day Free Trial' \
AND p_paid.status = 'paid' \
GROUP BY \
p_paid_user_id \
,trial_start_date \
,p_paid.created_at \
,u.email \
,cu.graduation_year \
,student_year \
,product \
,cu_user_id \
,u_user_id \
ORDER BY p_paid_user_id")
all_purch.registerTempTable("all_purch_table")
return all_purch
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句