|
Server : Apache/2.2.17 (Unix) mod_ssl/2.2.17 OpenSSL/0.9.8e-fips-rhel5 DAV/2 PHP/5.2.17 System : Linux localhost 2.6.18-419.el5 #1 SMP Fri Feb 24 22:47:42 UTC 2017 x86_64 User : nobody ( 99) PHP Version : 5.2.17 Disable Function : NONE Directory : /usr/lib64/python2.4/site-packages/sabayon/ |
Upload File : |
#
# Copyright (C) 2005 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
import os
import os.path
import gobject
import gamin
import util
import fnmatch
N_WATCHES_LIMIT = 200
CHANGED = gamin.GAMChanged
DELETED = gamin.GAMDeleted
CREATED = gamin.GAMCreated
def dprint (fmt, *args):
util.debug_print (util.DEBUG_DIRMONITOR, fmt % args)
def event_to_string (event):
if event == CHANGED:
return "changed"
elif event == DELETED:
return "deleted"
elif event == CREATED:
return "created"
else:
return "invalid"
class DirectoryMonitor:
def __init__ (self, directory, callback, data = None):
self.directory = directory
self.callback = callback
self.data = data
self.mon = None
self.watches = {}
self.too_many_watches = False
self.fd = -1
self.io_watch = None
self.dirs_to_ignore = []
self.files_to_ignore = []
def set_directories_to_ignore (self, dirs):
assert self.mon == None
self.dirs_to_ignore = dirs
dprint ("Ignoring directories %s", self.dirs_to_ignore)
def set_files_to_ignore (self, files):
assert self.mon == None
self.files_to_ignore = files
dprint ("Ignoring files %s", self.files_to_ignore)
#
# call the user level processing
#
def __invoke_callback (self, path, event):
if self.data:
self.callback (path, event, self.data)
else:
self.callback (path, event)
#
# Called from the main loop when we have data
#
def __pending_data (self, fd, condition):
try:
ret = self.mon.handle_events ()
except:
util.print_exception ()
return True
#
# Processing of a gamin callback
#
def __handle_gamin_event (self, path, event, monitor_file):
if event == CHANGED or event == DELETED or event == CREATED:
if not os.path.isabs (path):
path = os.path.join (monitor_file, path)
dprint ("Got gamin event '%s' on '%s'", event_to_string (event), path)
if event == CREATED and os.path.isdir (path):
self.__monitor_dir_recurse (path, True)
elif event == DELETED:
if path != self.directory and self.watches.has_key (path):
del self.watches[path]
if len (self.watches) < N_WATCHES_LIMIT:
self.too_many_watches = False
self.mon.stop_watch (path)
if not self.__should_ignore_dir (path) and \
not self.__should_ignore_file (path):
self.__invoke_callback (path, event)
def __should_ignore_dir (self, dir):
dir = os.path.normpath (dir)
for ignore_dir in self.dirs_to_ignore:
ignore_path = os.path.normpath (os.path.join (self.directory, ignore_dir))
if fnmatch.fnmatch (dir, ignore_path):
dprint ("Ignoring directory '%s'", dir)
return True
parent = os.path.dirname (dir)
if parent != dir:
return self.__should_ignore_dir (parent)
else:
return False
def __should_ignore_file (self, file):
file = os.path.normpath (file)
for ignore_file in self.files_to_ignore:
ignore_path = os.path.normpath (os.path.join (self.directory, ignore_file))
if fnmatch.fnmatch (file, ignore_path):
dprint ("Ignoring file '%s'", file)
return True
return self.__should_ignore_dir (os.path.dirname (file))
def __monitor_dir (self, dir):
if len (self.watches) >= N_WATCHES_LIMIT:
if not self.too_many_watches:
print "Too many directories to watch on %s" % (self.directory)
self.too_many_watches = True
return
if self.__should_ignore_dir (dir):
return
try:
self.mon.watch_directory (dir, self.__handle_gamin_event, dir)
except:
print _("Failed to add monitor for %s") % (dir)
util.print_exception ()
def __monitor_dir_recurse (self, dir, new_dir = False):
if self.too_many_watches:
return
if dir != self.directory:
self.__monitor_dir (dir)
for entry in os.listdir (dir):
path = os.path.join (dir, entry)
if self.__should_ignore_dir (path) or \
self.__should_ignore_file (path):
continue
if new_dir:
self.__invoke_callback (path, CREATED)
if os.path.isdir (path):
self.__monitor_dir_recurse (path, new_dir)
def start (self):
if self.mon != None:
return
dprint ("Starting to recursively monitor '%s'", self.directory)
self.mon = gamin.WatchMonitor ()
# ignore (End)Exists events since we scan the tree ourselves
try:
self.mon.no_exists ()
except:
pass
self.fd = self.mon.get_fd ()
self.io_watch = gobject.io_add_watch (self.fd,
gobject.IO_IN|gobject.IO_PRI,
self.__pending_data)
self.__monitor_dir (self.directory)
self.__monitor_dir_recurse (self.directory)
def stop (self):
if self.mon == None:
return
dprint ("Stopping recursive monitoring of '%s'", self.directory)
for path in self.watches:
self.mon.stop_watch (path)
gobject.source_remove (self.io_watch)
self.io_watch = 0
self.mon.disconnect ()
self.mon = None
self.fd = -1
def run_unit_tests ():
import tempfile
import shutil
temp_path = tempfile.mkdtemp (prefix = "test-monitor-")
def handle_change (path, event, data):
(expected, main_loop) = data
if len (expected) > 0:
i = 0
for (expected_path, expected_event) in expected:
if expected_path == path and expected_event == event:
break
i += 1
if i < len (expected):
del expected[i]
if len (expected) == 0:
main_loop.quit ()
def expect (expected, path, event):
expected.append ((path, event))
main_loop = gobject.MainLoop ()
expected = []
def should_not_be_reached (expected):
for (path, event) in expected:
print _("Expected event: %s %s") % (path, event_to_string (event))
assert False
return True
timeout = gobject.timeout_add (5 * 1000, should_not_be_reached, expected)
monitor = DirectoryMonitor (temp_path, handle_change, (expected, main_loop))
monitor.set_directories_to_ignore (["bar"])
monitor.set_files_to_ignore (["foobar/foo/foo.txt"])
monitor.start ()
expect (expected, os.path.join (temp_path, "foo.txt"), CREATED)
f = file (os.path.join (temp_path, "foo.txt"), "w")
f.close ()
# ignored
# expect (expected, os.path.join (temp_path, "bar"), CREATED)
os.mkdir (os.path.join (temp_path, "bar"))
expect (expected, os.path.join (temp_path, "foobar"), CREATED)
expect (expected, os.path.join (temp_path, "foobar/foo"), CREATED)
expect (expected, os.path.join (temp_path, "foobar/foo/bar"), CREATED)
expect (expected, os.path.join (temp_path, "foobar/foo/bar/foo"), CREATED)
expect (expected, os.path.join (temp_path, "foobar/foo/bar/foo/bar"), CREATED)
os.makedirs (os.path.join (temp_path, "foobar/foo/bar/foo/bar"))
# ignored:
# expect (expected, os.path.join (temp_path, "foobar/foo/foo.txt"), CREATED)
f = file (os.path.join (temp_path, "foobar/foo/foo.txt"), "w")
f.close ()
main_loop.run ()
expect (expected, os.path.join (temp_path, "foobar/foo/bar/foo/bar"), DELETED)
expect (expected, os.path.join (temp_path, "foobar/foo/bar/foo"), DELETED)
expect (expected, os.path.join (temp_path, "foobar/foo/bar"), DELETED)
# ignored:
# expect (expected, os.path.join (temp_path, "foobar/foo/foo.txt"), DELETED)
expect (expected, os.path.join (temp_path, "foobar/foo"), DELETED)
expect (expected, os.path.join (temp_path, "foobar"), DELETED)
# FIXME: we should be getting this event, but we don't seem to be
# expect (expected, os.path.join (temp_path, "foo.txt"), DELETED)
# ignore:
# expect (expected, os.path.join (temp_path, "bar"), DELETED)
expect (expected, temp_path, DELETED)
shutil.rmtree (temp_path, True)
main_loop.run ()
gobject.source_remove (timeout)
monitor.stop ()