Skip steps in fsevents queue

Martin

I'm currently monitoring a folder using fsevents. Every time a file is added, a code is executed on this file. A new file is added to the folder every second.

from fsevents import Observer, Stream

def file_event_callback(event):
    # code 256 for adding file to folder
    if event.mask == 256:
        fileChanged = event.name
        # do stuff with fileChanged file

if __name__ == "__main__":
    observer = Observer()   
    observer.start()
    stream = Stream(file_event_callback, 'folder', file_events=True)
    observer.schedule(stream)
    observer.join()

This works quite well. The only problem is, that the libary is building a queue for every file added to the folder. The code executed within the file_event_callback can take more then a second. When that happens the other items in the queue should be skipped so that only the newest one is used.

How can I skip items from the queue so that only the latest addition to the folder used after the last one is finished?

I tried using watchdog first but as this has to run on a mac I had some troubles making it work the way I wanted.

abarnert

I don't know exactly what library you're using, and when you say "this is building a queue…" I have no idea what "this" you're referring to… but an obvious answer is to stick your own queue in front of whatever it's using, so you can manipulate that queue directly. For example:

import queue
import threading

def skip_get(q):
    value = q.get(block=True)
    try:
        while True:
            value = q.get(block=False)
    except queue.Empty:
        return value

q = queue.Queue()

def file_event_callback(event):
    # code 256 for adding file to folder
    if event.mask == 256:
        fileChanged = event.name
        q.put(fileChanged)

def consumer():
    while True:
        fileChanged = skip_get(q)
        if fileChanged is None:
            return
        # do stuff with fileChanged

Now, before you start up the observer, do this:

t = threading.Thread(target=consumer)
t.start()

And at the end:

observer.join()
q.put(None)
t.join()

So, how does this work?

First, let's look at the consumer side. When you call q.get(), this pops the first thing off the queue. But what if nothing is there? That's what the block argument is for. If it's false, the get will raise a queue.Empty exception. If it's true, the get will wait forever (in a thread-safe way) until something appears to be popped. So, by blocking once, we handle the case where there's nothing to read yet. By then looping without blocking, we consume anything else on the queue, to handle the case where there are too many things to read. Because we keep reassigning value to whatever we popped, what we end up with is the last thing put on the queue.

Now, let's look at the producer side. When you call q.put(value), that just puts value on the queue. Unless you've put a size limit on the queue (which I haven't), there's no way this could block, so you don't have to worry about any of that. But now, how do you signal the consumer thread that you're finished? It's going to be waiting in q.get(block=True) forever; the only way to wake it up is to give it some value to pop. By pushing a sentinel value (in this case, None is fine, because it's not valid as a filename), and making the consumer handle that None by quitting, we give ourselves a nice, clean way to shutdown. (And because we never push anything after the None, there's no chance of accidentally skipping it.) So, we can just push None, then be sure that (barring any other bugs) the consumer thread will eventually quit, which means we can do t.join() to wait until it does without fear of deadlock.


I mentioned above that you could do this more simply with a Condition. If you think about how a queue actually works, it's just a list (or deque, or whatever) protected by a condition: the consumer waits on the condition until there's something available, and the producer makes something available by adding it to the list and signaling the condition. If you only ever want the last value, there's really no reason for the list. So, you can do this:

class OneQueue(object):
    def __init__(self):
        self.value = None
        self.condition = threading.Condition()
        self.sentinel = object()
    def get(self):
        with self.condition:
            while self.value is None:
                self.condition.wait()
            value, self.value = self.value, None
            return value
    def put(self, value):
        with self.condition:
            self.value = value
            self.condition.notify()
    def close(self):
        self.put(self.sentinel)

(Because I'm now using None to signal that nothing is available, I had to create a separate sentinel to signal that we're done.)

The problem with this design is that if the producers puts multiple values while the consumer is too busy to handle them, it can miss some of them—but in this case, that "problem" is exactly what you were looking for.

Still, using lower-level tools always means there's a lot more to get wrong, and this is especially dangerous with threading synchronization, because it involves problems that are hard to wrap your head around, and hard to debug even when you understand them, so you might be better off using a Queue anyway.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

From Java

Not compatible with your operating system or architecture: [email protected]

From Dev

Why does "font-weight: bolder" skip boldness steps?

From Dev

Dynamically create steps for jQuery Steps

From Dev

To queue or not to queue with Java mailing

From Dev

Check Session object so that user does not skip steps in a form

From Dev

Run cucumber steps in loop ONLY for few steps

From Dev

Ignoring fsevents dependency while deploying

From Dev

How to skip 10 steps in Bootstrap Tour?

From Dev

The job queue and the ready queue

From Dev

multiprocessing.Queue and Queue.Queue are different?

From Dev

npm WARN optional dep failed, continuing [email protected]

From Dev

Combining steps

From Dev

Skip Jenkins Pipeline Steps If Node Is Offline

From Dev

fsevents (pulled in by chokidar) in npm shrinkwrap causes build to fail on Windows

From Dev

error [email protected] in windows 7

From Dev

Spring Batch Java Config: Skip step when exception and go to next steps

From Dev

sftp file modifications not detected via FSEvents

From Dev

Can you skip the "checking.." steps when running ./configure?

From Dev

FSEvents: get monitored path inside fsevents_callback

From Dev

Polling for FSEvents from C++

From Dev

Highcharts skip non-provided steps in a series

From Dev

A queue ahead of the queue

From Dev

Why does my loop skip even steps?

From Dev

How to skip steps in background of Behave BDD?

From Dev

Java Calculate Max Steps of Stairs and skip stair

From Dev

How to skip specific Execution Plan Steps?

From Dev

How to solve "unsupported platform for [email protected]: wanted"

From Dev

Unsupported platform for fsevents error on installing Angular cli

From Dev

queryPedometerData returns nil steps even if there are steps logged

Related Related

  1. 1

    Not compatible with your operating system or architecture: [email protected]

  2. 2

    Why does "font-weight: bolder" skip boldness steps?

  3. 3

    Dynamically create steps for jQuery Steps

  4. 4

    To queue or not to queue with Java mailing

  5. 5

    Check Session object so that user does not skip steps in a form

  6. 6

    Run cucumber steps in loop ONLY for few steps

  7. 7

    Ignoring fsevents dependency while deploying

  8. 8

    How to skip 10 steps in Bootstrap Tour?

  9. 9

    The job queue and the ready queue

  10. 10

    multiprocessing.Queue and Queue.Queue are different?

  11. 11

    npm WARN optional dep failed, continuing [email protected]

  12. 12

    Combining steps

  13. 13

    Skip Jenkins Pipeline Steps If Node Is Offline

  14. 14

    fsevents (pulled in by chokidar) in npm shrinkwrap causes build to fail on Windows

  15. 15

    error [email protected] in windows 7

  16. 16

    Spring Batch Java Config: Skip step when exception and go to next steps

  17. 17

    sftp file modifications not detected via FSEvents

  18. 18

    Can you skip the "checking.." steps when running ./configure?

  19. 19

    FSEvents: get monitored path inside fsevents_callback

  20. 20

    Polling for FSEvents from C++

  21. 21

    Highcharts skip non-provided steps in a series

  22. 22

    A queue ahead of the queue

  23. 23

    Why does my loop skip even steps?

  24. 24

    How to skip steps in background of Behave BDD?

  25. 25

    Java Calculate Max Steps of Stairs and skip stair

  26. 26

    How to skip specific Execution Plan Steps?

  27. 27

    How to solve "unsupported platform for [email protected]: wanted"

  28. 28

    Unsupported platform for fsevents error on installing Angular cli

  29. 29

    queryPedometerData returns nil steps even if there are steps logged

HotTag

Archive