我已经测试过这样的代码:
# filename_queue comes from tf.train.string_input_producer
features, labels, filename_queue = read_batch_data(file_list, 10)
with tf.Session() as sess:
init = tf.initialize_all_variables()
sess.run(init)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
counter = 0
try:
while not coord.should_stop():
counter = counter + 1
value = features.eval()
if counter % 1000 == 0:
# check whether new data has been inserted into the queue
print counter, sum(value)
index = (counter / 1000) % 3
enqueue_op = filename_queue.enqueue(['a%d.csv' % index])
sess.run([enqueue_op])
except tf.errors.OutOfRangeError
...
但是看起来该图仍然使用原始文件队列,并且从不读取新数据。
我怀疑您有一个带有旧名称集的大型预取缓冲区,因此当您添加新文件名时,只有在预取缓冲区用尽后才能看到它。默认情况下,tf.string_input_producer
将在名称集中无限循环,并且将填充size的预取缓冲区32
。
如果要修改列表,则使用aFIFOQueue
并手动将其填充到示例中会比较容易,而不是string_input_producer.
小心不要提供足够的示例并挂起主线程,可能要config.operation_timeout_in_ms=5000
为您的会话进行设置
例如,以下示例一次来自/ temp / pipeline / 0文件的条目(文件中有10个条目),之后它将打印来自/ temp / pipeline / 1的条目
创建一些测试数据
def dump_numbers_to_file(fname, start_num, end_num):
with open(fname, 'w') as f:
for i in range(start_num, end_num):
f.write(str(i)+"\n")
num_files=10
num_entries_per_file=10
file_root="/temp/pipeline"
os.system('mkdir -p '+file_root)
for fi in range(num_files):
fname = file_root+"/"+str(fi)
dump_numbers_to_file(fname, fi*num_entries_per_file, (fi+1)*num_entries_per_file)
帮助程序实用程序来创建会话
def create_session():
"""Resets local session, returns new InteractiveSession"""
config = tf.ConfigProto(log_device_placement=True)
config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM
config.operation_timeout_in_ms=15000 # terminate on long hangs
sess = tf.InteractiveSession("", config=config)
return sess
运行你的例子
tf.reset_default_graph()
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string])
enqueue_op = filename_queue.enqueue("/temp/pipeline/0")
sess = create_session()
sess.run(enqueue_op)
sess.run(enqueue_op)
# filename queue now has [/temp/pipeline/0, /temp/pipeline/0]
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
numeric_val, = tf.decode_csv(value, record_defaults=[[-1]])
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(10):
print sess.run([numeric_val])
# filename queue now has [/temp/pipeline/0]
print 'size before', sess.run(filename_queue.size())
sess.run(filename_queue.enqueue("/temp/pipeline/1"))
# filename queue now has [/temp/pipeline/0, /temp/pipeline/1]
print 'size after', sess.run(filename_queue.size())
for i in range(10):
print sess.run([numeric_val])
# filename queue now has [/temp/pipeline/1]
for i in range(10):
print sess.run([numeric_val])
# filename queue is now empty, next sess.run([numeric_val]) would hang
coord.request_stop()
coord.join(threads)
你应该看到
[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
size before 1
size after 2
[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
[10]
[11]
[12]
[13]
[14]
[15]
[16]
[17]
[18]
[19]
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句