这是我昨天问的一个问题的后续行动。
我有table1
哪个跟踪产品的实施状态(feature
列)。不幸的是,该表不包含feature
尚未实现a的行,我想添加此行。还有table2
一个包含id
-table2.id
的连接键table1.xml_id
。
简而言之,我要:
table2.id
不在中table1
,请no_contact
为每个feature
在中没有状态的用户分配一个状态table1
。table1.xml_ids
已经存在table1
但没有特定状态的对象,请feature
为其分配一个no_contact
状态feature
。这样,我将为每一个table2.id
和一行table1.feature
,每个都有正确的status
。
的完整列表features
可以在table3
或中找到disitinct(table2.feature)
。我最初试图用raw来解决这个问题,HQL
例如:
select
distinct(f.feature),
p.id as xml_id,
"no_contact" as status
from
table3 f
cross join
table2 p
where
id in(
select
p.id
from
table2 p
left join(
select
xml_id
from
table1
where
yyyy_mm_dd = '2020-05-30'
) s on s.xml_id = p.id
where
s.xml_id is null
)
group by
1,2,3
union all
select
feature,
xml_id,
status
from
table1
where
yyyy_mm_dd = '2020-05-30'
以上几乎有效。问题是,该no_contact
计数是每个相同的feature
,而不是一个正确的计数每 feature
。我认为,使用Pandas在Python中解决可能会更容易。因此,我通过PySpark引入了数据。
以下是那些table1
已经具有状态的数据:
has_status = spark.sql("""
select
yyyy_mm_dd,
xml_id,
feature,
status
from
table1
where
yyyy_mm_dd = '2020-05-30'
""")
has_status = has_status.toPandas()
has_status
这是alltable2.id
的数据:
all_ids = spark.sql("""
select
p.id
from
table2
""")
all_ids = no_status.toPandas()
all_ids
是否有可能在Python中实现这一目标?因此,如果atable2.id
中没有每行feature
,has_status
那么我想将添加id
到has_status
并分配一个status
no_contact。另外,如果我可以no_contact
为那些已经加入table1
但缺少特定功能(即no_contact)状态的用户添加一个,那就太好了。示例数据/架构:
DROP TABLE IF EXISTS table1;
CREATE TABLE table1 (
`yyyy_mm_dd` DATE,
`xml_id` INTEGER,
`feature` VARCHAR(31),
`status` VARCHAR(31)
);
INSERT INTO table1
(yyyy_mm_dd, xml_id, feature, status)
VALUES
('2020-07-10', '2', 'basic', 'implemented'),
('2020-07-10', '2', 'geo', 'implemented'),
('2020-07-10', '2', 'mobile', 'first_contact'),
('2020-07-10', '1', 'geo', 'first_contact'),
('2020-07-10', '1', 'mobile', 'implemented'),
('2020-07-10', '3', 'basic', 'first_contact'),
('2020-07-10', '3', 'geo', 'implemented')
;
DROP TABLE IF EXISTS table2;
CREATE TABLE table2 (
`id` INTEGER,
`name` VARCHAR(3),
`active` INTEGER
);
INSERT INTO table2
(`id`, `name`, `active`)
VALUES
('1', 'xyz', '1'),
('2', 'dfg', '1'),
('3', 'lki', '1'),
('4', 'nbg', '0'),
('5', 'qyt', '0'),
('6', 'bfh', '1');
DROP TABLE IF EXISTS table3;
CREATE TABLE table3 (
`feature` VARCHAR(20),
`metric` VARCHAR(20),
`app` VARCHAR(20)
);
INSERT INTO table3
(`feature`, `metric`, `app`)
VALUES
('basic', 'basic_read', 'promotions'),
('basic', 'basic_update', 'promotions'),
('basic', 'basic_write', 'promotions'),
('geo', 'geo_update', 'admin'),
('geo', 'geo_write', 'admin'),
('mobile', 'mobile_executed', 'admin');
根据上述样本数据,预期输出将df
类似于以下内容:
预期输出如下所示:
| yyyy_mm_dd | xml_id | feature | status |
|------------|--------|---------|---------------|
| 2020-07-10 | 2 | basic | implemented |
| 2020-07-10 | 2 | geo | implemented |
| 2020-07-10 | 2 | mobile | first_contact |
| 2020-07-10 | 1 | geo | first_contact |
| 2020-07-10 | 1 | mobile | implemented |
| 2020-07-10 | 3 | basic | first_contact |
| 2020-07-10 | 3 | geo | implemented |
| 2020-07-10 | 4 | mobile | no_contact |
| 2020-07-10 | 4 | geo | no_contact |
| 2020-07-10 | 4 | basic | no_contact |
| 2020-07-10 | 5 | mobile | no_contact |
| 2020-07-10 | 5 | geo | no_contact |
| 2020-07-10 | 5 | basic | no_contact |
| 2020-07-10 | 1 | basic | no_contact |
| 2020-07-10 | 3 | mobile | no_contact |
这是使用pyspark的方法。
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy()
df2.selectExpr("id as xml_id") \
.crossJoin(df3.select('feature').distinct()) \
.join(df1, ['xml_id', 'feature'], 'left') \
.withColumn('yyyy_mm_dd', f.max('yyyy_mm_dd').over(w)) \
.withColumn('status', f.expr("coalesce(status, 'no_contract')")) \
.orderBy('xml_id', 'feature') \
.show(20, False)
+------+-------+----------+-------------+
|xml_id|feature|yyyy_mm_dd|status |
+------+-------+----------+-------------+
|1 |basic |2020-07-10|no_contract |
|1 |geo |2020-07-10|first_contact|
|1 |mobile |2020-07-10|implemented |
|2 |basic |2020-07-10|implemented |
|2 |geo |2020-07-10|implemented |
|2 |mobile |2020-07-10|first_contact|
|3 |basic |2020-07-10|first_contact|
|3 |geo |2020-07-10|implemented |
|3 |mobile |2020-07-10|no_contract |
|4 |basic |2020-07-10|no_contract |
|4 |geo |2020-07-10|no_contract |
|4 |mobile |2020-07-10|no_contract |
|5 |basic |2020-07-10|no_contract |
|5 |geo |2020-07-10|no_contract |
|5 |mobile |2020-07-10|no_contract |
|6 |basic |2020-07-10|no_contract |
|6 |geo |2020-07-10|no_contract |
|6 |mobile |2020-07-10|no_contract |
+------+-------+----------+-------------+
数据帧是
df1 = spark.createDataFrame([ ('2020-07-10', '2', 'basic', 'implemented'),
('2020-07-10', '2', 'geo', 'implemented'),
('2020-07-10', '2', 'mobile', 'first_contact'),
('2020-07-10', '1', 'geo', 'first_contact'),
('2020-07-10', '1', 'mobile', 'implemented'),
('2020-07-10', '3', 'basic', 'first_contact'),
('2020-07-10', '3', 'geo', 'implemented')], ['yyyy_mm_dd', 'xml_id', 'feature', 'status'])
df1.show(10, False)
+----------+------+-------+-------------+
|yyyy_mm_dd|xml_id|feature|status |
+----------+------+-------+-------------+
|2020-07-10|2 |basic |implemented |
|2020-07-10|2 |geo |implemented |
|2020-07-10|2 |mobile |first_contact|
|2020-07-10|1 |geo |first_contact|
|2020-07-10|1 |mobile |implemented |
|2020-07-10|3 |basic |first_contact|
|2020-07-10|3 |geo |implemented |
+----------+------+-------+-------------+
df2 = spark.createDataFrame([ ('1', 'xyz', '1'),
('2', 'dfg', '1'),
('3', 'lki', '1'),
('4', 'nbg', '0'),
('5', 'qyt', '0'),
('6', 'bfh', '1')], ['id', 'name', 'active'])
df2.show(10, False)
+---+----+------+
|id |name|active|
+---+----+------+
|1 |xyz |1 |
|2 |dfg |1 |
|3 |lki |1 |
|4 |nbg |0 |
|5 |qyt |0 |
|6 |bfh |1 |
+---+----+------+
df3 = spark.createDataFrame([ ('basic', 'basic_read', 'promotions'),
('basic', 'basic_update', 'promotions'),
('basic', 'basic_write', 'promotions'),
('geo', 'geo_update', 'admin'),
('geo', 'geo_write', 'admin'),
('mobile', 'mobile_executed', 'admin')], ['feature', 'metric', 'app'])
df3.show(10, False)
+-------+---------------+----------+
|feature|metric |app |
+-------+---------------+----------+
|basic |basic_read |promotions|
|basic |basic_update |promotions|
|basic |basic_write |promotions|
|geo |geo_update |admin |
|geo |geo_write |admin |
|mobile |mobile_executed|admin |
+-------+---------------+----------+
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句