当我的函数调用另一个函数时,未定义“ sqlContext”

教会

我有一个函数all_purch_spark(),它为五个不同的表设置一个Spark上下文以及SQL Context。然后,相同的函数针对AWS Redshift数据库成功运行sql查询。效果很好。我将在下面包括整个功能(当然会删除敏感数据)。请原谅它的长度,但是鉴于我所面临的问题,我想显示它的长度。

我的问题是第二个函数repurch_prep()及其如何调用第一个函数all_purch_spark()。我不知道如何避免这样的错误:NameError:未定义名称'sqlContext'

我将在下面显示两个功能和错误。

这是第一个函数all_purch_spark()。我再次将整个功能放在这里以供参考。我知道这很长,但不确定是否可以将其简化为有意义的示例。

def all_purch_spark():
    config = {
    'redshift_user': 'tester123',
    'redshift_pass': '*****************',
    'redshift_port': "5999",
    'redshift_db': 'my_database',
    'redshift_host': 'redshift.my_database.me',
    }

    from pyspark import SparkContext, SparkConf, SQLContext
    jars = [
    "/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
    ]
    conf = (
        SparkConf()
        .setAppName("S3 with Redshift")
        .set("spark.driver.extraClassPath", ":".join(jars))
        .set("spark.hadoop.fs.s3a.path.style.access", True)
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("com.amazonaws.services.s3.enableV4", True)
        .set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
        .set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
        .set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
    )
    sc = SparkContext(conf=conf).getOrCreate()
    sqlContext = SQLContext(sc)

    ##Set Schema and table to query
    schema1 = 'production'
    schema2 = 'X4production'
    table1 = 'purchases'
    table2 = 'customers'
    table3 = 'memberships'
    table4 = 'users'  #set as users table in both schemas

    purchases_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table1}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    customers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table2}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    memberships_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table3}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    users_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    cusers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema2}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()
        
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('fc_purchases').getOrCreate()

    purchases_df.createOrReplaceTempView('purchases')
    customers_df.createOrReplaceTempView('customers')
    memberships_df.createOrReplaceTempView('memberships')
    users_df.createOrReplaceTempView('users')
    cusers_df.createOrReplaceTempView('cusers')

    all_purch = spark.sql("SELECT \
    p_paid.customer_id AS p_paid_user_id \
    ,p_trial.created_at AS trial_start_date \
    ,p_paid.created_at \
    ,cu.graduation_year \
    ,lower(cu.student_year) AS student_year \
    ,lower(p_paid.description) as product \
    ,u.email \
    ,u.id AS u_user_id \
    ,cu.id AS cu_user_id \
    FROM \
    purchases AS p_paid \
    INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
    INNER JOIN customers AS c on c.id = p_paid.customer_id \
    INNER JOIN memberships AS m on m.id = c.membership_id \
    INNER JOIN users AS u on u.id = m.user_id \
    INNER JOIN cusers AS cu on cu.id = u.id \
    WHERE \
    p_trial.created_at >= '2018-03-01' \
    AND p_paid.created_at >= '2018-03-01' \
    AND u.institution_contract = false \
    AND LOWER(u.email) not like '%hotmail.me%' \
    AND LOWER(u.email) not like '%gmail.com%' \
    AND p_trial.description like '% Day Free Trial' \
    AND p_paid.status = 'paid' \
    GROUP BY \
    p_paid_user_id \
    ,trial_start_date \
    ,p_paid.created_at \
    ,u.email \
    ,cu.graduation_year \
    ,student_year \
    ,product \
    ,cu_user_id \
    ,u_user_id \
    ORDER BY p_paid_user_id") 
    all_purch.registerTempTable("all_purch_table")
   
    return all_purch    

这是调用上述功能的第二个功能。应该根据上面功能中设置的注册表视图进行选择:

    def repurch_prep():
        all_purch_spark()
        all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
        (SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
        ORDER BY p_paid_user_id ASC")
        return all_repurch

当我运行repurch_prep()时,即使上述函数中定义了SQL上下文,它也会引发以下异常。我试图返回上面的值,但无法弄清楚如何使其工作:

---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
 in 
----> 1 repurch_prep()

~/spark/SparkNotebooks/firecracker/utils_prod_db_spark.py in repurch_prep()
    735     #sc = SparkContext().getOrCreate()
    736     #sqlContext = SQLContext()
--> 737     all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
    738     (SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
    739     ORDER BY p_paid_user_id ASC")

NameError: name 'sqlContext' is not defined

任何帮助,不胜感激。

教会

每个@Lamanus的解决方案是将变量放置在函数之外,使它们成为全局变量,而不是将它们存储在一个函数中(就像我所做的那样),然后从另一个函数中调用该函数。

############### SPARK REDSHIFT GLOBAL CONFIG #####################
    config = {
    'redshift_user': 'tester123',
    'redshift_pass': '*****************',
    'redshift_port': "5999",
    'redshift_db': 'my_database',
    'redshift_host': 'redshift.my_database.me',
    }

    from pyspark import SparkContext, SparkConf, SQLContext
    jars = [
    "/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
    ]
    conf = (
        SparkConf()
        .setAppName("S3 with Redshift")
        .set("spark.driver.extraClassPath", ":".join(jars))
        .set("spark.hadoop.fs.s3a.path.style.access", True)
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("com.amazonaws.services.s3.enableV4", True)
        .set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
        .set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
        .set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
    )
    sc = SparkContext(conf=conf).getOrCreate()
###############################################################


def all_purch_spark():
    sqlContext = SQLContext(sc)

    ##Set Schema and table to query
    schema1 = 'production'
    schema2 = 'X4production'
    table1 = 'purchases'
    table2 = 'customers'
    table3 = 'memberships'
    table4 = 'users'  #set as users table in both schemas

    purchases_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table1}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    customers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table2}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    memberships_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table3}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    users_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    cusers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema2}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()
        
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('fc_purchases').getOrCreate()

    purchases_df.createOrReplaceTempView('purchases')
    customers_df.createOrReplaceTempView('customers')
    memberships_df.createOrReplaceTempView('memberships')
    users_df.createOrReplaceTempView('users')
    cusers_df.createOrReplaceTempView('cusers')

    all_purch = spark.sql("SELECT \
    p_paid.customer_id AS p_paid_user_id \
    ,p_trial.created_at AS trial_start_date \
    ,p_paid.created_at \
    ,cu.graduation_year \
    ,lower(cu.student_year) AS student_year \
    ,lower(p_paid.description) as product \
    ,u.email \
    ,u.id AS u_user_id \
    ,cu.id AS cu_user_id \
    FROM \
    purchases AS p_paid \
    INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
    INNER JOIN customers AS c on c.id = p_paid.customer_id \
    INNER JOIN memberships AS m on m.id = c.membership_id \
    INNER JOIN users AS u on u.id = m.user_id \
    INNER JOIN cusers AS cu on cu.id = u.id \
    WHERE \
    p_trial.created_at >= '2018-03-01' \
    AND p_paid.created_at >= '2018-03-01' \
    AND u.institution_contract = false \
    AND LOWER(u.email) not like '%hotmail.me%' \
    AND LOWER(u.email) not like '%gmail.com%' \
    AND p_trial.description like '% Day Free Trial' \
    AND p_paid.status = 'paid' \
    GROUP BY \
    p_paid_user_id \
    ,trial_start_date \
    ,p_paid.created_at \
    ,u.email \
    ,cu.graduation_year \
    ,student_year \
    ,product \
    ,cu_user_id \
    ,u_user_id \
    ORDER BY p_paid_user_id") 
    all_purch.registerTempTable("all_purch_table")
   
    return all_purch    

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

创建 SQLContext 对象时构造函数 HiveContext(JavaSparkContext) 是未定义错误

来自分类Dev

当我使用预定义函数定义另一个函数时,RaiseError被调用

来自分类Dev

从另一个类调用函数时属性未定义

来自分类Dev

从另一个类调用函数时属性未定义

来自分类Dev

JavaScript表示未定义被另一个函数调用的编程函数

来自分类Dev

在另一个函数中调用时未定义函数(Python)

来自分类Dev

为什么我不能从 python 中的另一个函数调用一个函数。函数未定义错误

来自分类Dev

nodejs-在另一个js文件中打印未定义的调用函数

来自分类常见问题

传递给另一个.js文件中的函数时,属性未定义

来自分类Dev

SQLContext 与 DataFrameLoader

来自分类Dev

隐式超级构造函数Person()未定义。必须显式调用另一个构造函数?

来自分类Dev

如果通过调用另一个noreturn函数返回,则从noreturn函数返回的行为是否未定义?

来自分类Dev

如果通过调用另一个noreturn函数返回,则从noreturn函数返回的行为是否未定义?

来自分类Dev

为另一个函数中使用的函数获取未定义

来自分类Dev

在另一个函数React返回的匿名函数中,“ this”是未定义的

来自分类Dev

当我使用window.location.href时,那么我的另一个函数没有调用。

来自分类Dev

为什么我在React中的状态在一个函数中定义而在另一个函数中未定义?

来自分类Dev

当我访问另一个状态时停止一个函数

来自分类Dev

当我从按钮调用对象时,将其按参数发送给另一个函数

来自分类Dev

为什么当我在一个函数中定义变量时,却无法在另一个函数中访问它

来自分类Dev

为什么我的方法不调用另一个(未定义的变量)

来自分类Dev

当从另一个上下文中调用方法时,“ this”是未定义的

来自分类Dev

在具有来自另一个文件的内部数组的函数中未定义

来自分类Dev

另一个:未定义的未捕获的typeerror不是函数

来自分类Dev

对作为另一个类成员的函数的未定义引用

来自分类Dev

当我在同一click事件中调用另一个函数时,javascript函数未从背后的代码调用

来自分类Dev

从另一个函数调用函数时出错

来自分类Dev

如果我正在读取JSON字符串,则SQLContext.createDataframe(RDD,StructType)与SQLContext.read()。schema(StructType).json(RDD)之间的区别?

来自分类Dev

C标头中的一个函数的“未定义的引用”,而另一个函数中的“未定义的引用”

Related 相关文章

  1. 1

    创建 SQLContext 对象时构造函数 HiveContext(JavaSparkContext) 是未定义错误

  2. 2

    当我使用预定义函数定义另一个函数时,RaiseError被调用

  3. 3

    从另一个类调用函数时属性未定义

  4. 4

    从另一个类调用函数时属性未定义

  5. 5

    JavaScript表示未定义被另一个函数调用的编程函数

  6. 6

    在另一个函数中调用时未定义函数(Python)

  7. 7

    为什么我不能从 python 中的另一个函数调用一个函数。函数未定义错误

  8. 8

    nodejs-在另一个js文件中打印未定义的调用函数

  9. 9

    传递给另一个.js文件中的函数时,属性未定义

  10. 10

    SQLContext 与 DataFrameLoader

  11. 11

    隐式超级构造函数Person()未定义。必须显式调用另一个构造函数?

  12. 12

    如果通过调用另一个noreturn函数返回,则从noreturn函数返回的行为是否未定义?

  13. 13

    如果通过调用另一个noreturn函数返回,则从noreturn函数返回的行为是否未定义?

  14. 14

    为另一个函数中使用的函数获取未定义

  15. 15

    在另一个函数React返回的匿名函数中,“ this”是未定义的

  16. 16

    当我使用window.location.href时,那么我的另一个函数没有调用。

  17. 17

    为什么我在React中的状态在一个函数中定义而在另一个函数中未定义?

  18. 18

    当我访问另一个状态时停止一个函数

  19. 19

    当我从按钮调用对象时,将其按参数发送给另一个函数

  20. 20

    为什么当我在一个函数中定义变量时,却无法在另一个函数中访问它

  21. 21

    为什么我的方法不调用另一个(未定义的变量)

  22. 22

    当从另一个上下文中调用方法时,“ this”是未定义的

  23. 23

    在具有来自另一个文件的内部数组的函数中未定义

  24. 24

    另一个:未定义的未捕获的typeerror不是函数

  25. 25

    对作为另一个类成员的函数的未定义引用

  26. 26

    当我在同一click事件中调用另一个函数时,javascript函数未从背后的代码调用

  27. 27

    从另一个函数调用函数时出错

  28. 28

    如果我正在读取JSON字符串,则SQLContext.createDataframe(RDD,StructType)与SQLContext.read()。schema(StructType).json(RDD)之间的区别?

  29. 29

    C标头中的一个函数的“未定义的引用”,而另一个函数中的“未定义的引用”

热门标签

归档