我正在尝试创建一个计时器触发器 azure 函数,该函数从 blob 中获取数据,对其进行聚合,然后将这些聚合放入 cosmosDB 中。我之前尝试使用 azure 函数中的绑定来使用 blob 作为输入,但我被告知这是不正确的(请参阅此线程:Azure 函数 python 命名参数没有值)。
我现在正在使用 SDK 并遇到以下问题:
import sys, os.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'myenv/Lib/site-packages')))
import json
import pandas as pd
from azure.storage.blob import BlockBlobService
data = BlockBlobService(account_name='accountname', account_key='accountkey')
container_name = ('container')
generator = data.list_blobs(container_name)
for blob in generator:
print("{}".format(blob.name))
json = json.loads(data.get_blob_to_text('container', open(blob.name)))
df = pd.io.json.json_normalize(json)
print(df)
这会导致错误:
IOError: [Errno 2] No such file or directory: 'test.json'
我意识到这可能是一个绝对路径问题,但我不确定它如何与 azure 存储一起使用。关于如何规避这种情况的任何想法?
通过执行以下操作使其“工作”:
for blob in generator:
loader = data.get_blob_to_text('kvaedevdystreamanablob',blob.name,if_modified_since=delta)
json = json.loads(loader.content)
这适用于一个 json 文件,即我只有一个在存储中,但是当添加更多时,我收到此错误:
ValueError: Expecting object: line 1 column 21907 (char 21906)
即使我添加if_modified_since
为只接收一个 blob,也会发生这种情况。如果我想出什么,会更新。欢迎随时提供帮助。
另一个更新:我的数据通过流分析进入,然后下降到 blob。我选择数据应该以数组的形式出现,这就是发生错误的原因。当流终止时,blob 不会立即附加]
到 json 中的 EOF 行,因此 json 文件无效。现在将尝试在流分析中逐行使用而不是数组。
弄清楚了。最后,这是一个非常简单的修复:
我必须确保 blob 中的每个 json 条目都少于 1024 个字符,否则会创建一个新行,从而使读取行出现问题。
遍历每个 blob 文件、读取并添加到列表的代码如下:
data = BlockBlobService(account_name='accname', account_key='key')
generator = data.list_blobs('collection')
dataloaded = []
for blob in generator:
loader = data.get_blob_to_text('collection',blob.name)
trackerstatusobjects = loader.content.split('\n')
for trackerstatusobject in trackerstatusobjects:
dataloaded.append(json.loads(trackerstatusobject))
从这里你可以添加到数据框并做你想做的任何事情:) 如果有人偶然发现类似的问题,希望这会有所帮助。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句