我编写了一个自定义转换器类POSWordTagger
。我的_transform()
方法代码是,
def _transform(self, dataset):
def f(s):
tokens = nltk.tokenize.wordpunct_tokenize(s)
pos_tags = nltk.pos_tag(tokens)
return pos_tags
t = ArrayType(StringType())
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))
我按如下方式调用我的变压器类,
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sentenceDataFrame = sqlContext.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
pos_tagger = POSWordTagger(inputCol="sentence", outputCol="pos")
pos_output=pos_tagger.transform(sentenceDataFrame)
pos_output.select("pos").show()
我得到的输出是,
+--------------------+
| pos|
+--------------------+
|[[Ljava.lang.Obje...|
|[[Ljava.lang.Obje...|
|[[Ljava.lang.Obje...|
+--------------------+
即使我将架构作为 传递ArrayType(StringType())
,我也将对象引用作为输出。但是如果我只返回tokens
as 输出而不是pos_tags
从我的_transform()
方法中返回,我会正确地获得输出,即令牌列表。谁能让我知道我错过了什么或做错了什么?任何帮助表示赞赏。我的环境是 Spark 1.6 和 Python 2.7。
看下面的例子,pos_tag
返回list(tuple(string))
:
>>> text = word_tokenize("And now for something completely different")
>>> nltk.pos_tag(text)
[('And', 'CC'), ('now', 'RB'), ('for', 'IN'), ('something', 'NN'),
('completely', 'RB'), ('different', 'JJ')]
所以你的代码中的问题看起来在这里ArrayType(StringType())
,所以它应该是ArrayType(ArrayType(StringType()))
import pyspark.sql.types as T
import pyspark.sql.functions as F
def flattenArray(obj):
return reduce(lambda x,y:x+y, obj)
pos_output.select(F.udf(flattenArray, T.ArrayType(T.StringType()))("pos").alias("pos")).show(truncate = False)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句