|
Server : Apache/2.2.17 (Unix) mod_ssl/2.2.17 OpenSSL/0.9.8e-fips-rhel5 DAV/2 PHP/5.2.17 System : Linux localhost 2.6.18-419.el5 #1 SMP Fri Feb 24 22:47:42 UTC 2017 x86_64 User : nobody ( 99) PHP Version : 5.2.17 Disable Function : NONE Directory : /proc/21573/root/usr/lib/python2.4/site-packages/setroubleshoot/ |
Upload File : |
# Authors: John Dennis <jdennis@redhat.com>
#
# Copyright (C) 2006,2007,2008 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
__all__ = [
'SignatureMatch',
'SEFilter',
'SEFaultSignature',
'SEFaultSignatureInfo',
'SEFaultSignatureSet',
'SEFaultSolution',
'SEFaultSignatureUser',
'SEEnvironment',
'SEDatabaseProperties',
'SEFaultUserInfo',
'SEFaultUserSet',
'SEEmailRecipient',
'SEEmailRecipientSet',
'FILTER_NEVER',
'FILTER_ALWAYS',
'FILTER_AFTER_FIRST',
'filter_text'
]
# FIXME
import gettext
from setroubleshoot.config import parse_config_setting, get_config
gettext.install(domain = get_config('general', 'i18n_text_domain'),
localedir = get_config('general', 'i18n_locale_dir'),
unicode = False,
codeset = get_config('general', 'i18n_encoding'))
from setroubleshoot.log import log_init
log_init('test', {'console':True,
'level':'debug'})
from setroubleshoot.log import *
# FIXME end
from setroubleshoot.config import get_config
from setroubleshoot.errcode import *
from setroubleshoot.util import *
from setroubleshoot.xml_serialize import *
# FIXME from setroubleshoot.log import *
from setroubleshoot.util import *
from setroubleshoot.html_util import *
import setroubleshoot.uuid as uuid
from setroubleshoot.audit_data import *
import selinux
import re
from types import *
# Don't reuse the numeric values!
FILTER_NEVER = 0
FILTER_ALWAYS = 4
FILTER_AFTER_FIRST = 8
filter_text = {
FILTER_NEVER : _("Never Ignore"),
FILTER_ALWAYS : _("Ignore Always"),
FILTER_AFTER_FIRST : _("Ignore After First Alert"),
}
map_filter_value_to_name = {
FILTER_NEVER : 'never',
FILTER_ALWAYS : 'always',
FILTER_AFTER_FIRST : 'after_first',
}
map_filter_name_to_value = {
'never' : FILTER_NEVER,
'always' : FILTER_ALWAYS,
'after_first' : FILTER_AFTER_FIRST,
}
#------------------------------------------------------------------------
class SignatureMatch(object):
def __init__(self, siginfo, score):
self.siginfo = siginfo
self.score = score
class SEEnvironment(XmlSerialize):
_xml_info = {
'version' : {'XMLForm':'attribute','default':lambda: '1.0' },
'platform' : {'XMLForm':'element' },
'kernel' : {'XMLForm':'element' },
'policy_type' : {'XMLForm':'element' },
'policy_rpm' : {'XMLForm':'element' },
'enforce' : {'XMLForm':'element' },
'selinux_enabled' : {'XMLForm':'element', 'import_typecast':boolean, },
'selinux_mls_enabled' : {'XMLForm':'element', 'import_typecast':boolean, },
'policyvers' : {'XMLForm':'element' },
'hostname' : {'XMLForm':'element' },
'uname' : {'XMLForm':'element' },
}
def __init__(self):
super(SEEnvironment, self).__init__()
def update(self):
import platform
# security_getenforce is the same as the getenforce command.
# selinux_getenforcemode tells you what is set in /etc/selinux/config
self.platform, self.kernel = get_os_environment()
self.policy_type = selinux.selinux_getpolicytype()[1]
#self.policy_rpm = get_rpm_nvr_by_name("selinux-policy")
self.policy_rpm = get_rpm_nvr_by_name_temporary("selinux-policy")
self.policyvers = str(selinux.security_policyvers())
enforce = selinux.security_getenforce()
if enforce == 0:
self.enforce = "Permissive"
else:
self.enforce = "Enforcing"
self.selinux_enabled = bool(selinux.is_selinux_enabled())
self.selinux_mls_enabled = bool(selinux.is_selinux_mls_enabled())
self.hostname = platform.node()
self.uname = " ".join(platform.uname())
def __ne__(self, other):
return not self.__eq__(other)
def __eq__(self, other):
for name in self._xml_info.keys():
if getattr(self, name) != getattr(other, name):
return False
return True
class SEFaultSolution(XmlSerialize):
_xml_info = {
'version' : {'XMLForm':'attribute', 'default': lambda: '1.0' },
'summary' : {'XMLForm':'element', 'export_typecast':string_to_cdata_xmlnode },
'problem_description' : {'XMLForm':'element', 'export_typecast':string_to_cdata_xmlnode },
'fix_description' : {'XMLForm':'element', 'export_typecast':string_to_cdata_xmlnode },
'fix_cmd' : {'XMLForm':'element' },
'rpm_list' : {'XMLForm':'element', 'list':'rpm', },
'rpm_version' : {'XMLForm':'element' },
'policy_version' : {'XMLForm':'element' },
}
def __init__(self, summary, problem_description, fix_description, fix_cmd):
super(SEFaultSolution, self).__init__()
self.summary = summary
self.problem_description = problem_description
self.fix_description = fix_description
self.fix_cmd = fix_cmd
class SEFilter(XmlSerialize):
_xml_info = {
'filter_type' : {'XMLForm':'element', 'import_typecast':int, 'default':lambda: FILTER_NEVER },
'count' : {'XMLForm':'element', 'import_typecast':int, 'default':lambda: 0 },
}
def __init__(self, filter_type=FILTER_NEVER):
super(SEFilter, self).__init__()
self.filter_type = filter_type
class SEFaultSignatureUser(XmlSerialize):
_xml_info = {
'username' : {'XMLForm':'attribute' },
'seen_flag' : {'XMLForm':'attribute', 'import_typecast':boolean, 'default': lambda: False },
'delete_flag' : {'XMLForm':'attribute', 'import_typecast':boolean, 'default': lambda: False },
'filter' : {'XMLForm':'element', 'import_typecast':SEFilter, 'default': lambda: SEFilter() },
}
def __init__(self, username):
super(SEFaultSignatureUser, self).__init__()
self.username = username
def update_item(self, item, data):
if not item in self._names:
raise ProgramError(ERR_NOT_MEMBER, 'item (%s) is not a defined member' % item)
if item == 'username':
raise ProgramError(ERR_ILLEGAL_USER_CHANGE, 'changing the username is illegal')
setattr(self, item, data)
def update_filter(self, filter_type, data=None):
if debug:
log_sig.debug("update_filter: filter_type=%s data=%s", map_filter_value_to_name.get(filter_type, 'unknown'), data)
if filter_type == FILTER_NEVER or \
filter_type == FILTER_AFTER_FIRST or \
filter_type == FILTER_ALWAYS:
if debug:
log_sig.debug("update_filter: !!!")
self.filter = SEFilter(filter_type=filter_type)
return True
else:
raise ValueError("Bad filter_type (%s)" % filter_type)
# --
class AttributeValueDictionary(XmlSerialize):
_xml_info = 'unstructured'
def __init__(self):
super(AttributeValueDictionary, self).__init__()
class SEFaultSignature(XmlSerialize):
_xml_info = {
'version' : {'XMLForm':'attribute','default':lambda: '3.0', },
'analysis_id' : {'XMLForm':'element', },
'host' : {'XMLForm':'element' },
'access' : {'XMLForm':'element', 'list':'operation', },
'scontext' : {'XMLForm':'element', 'import_typecast':AvcContext },
'tcontext' : {'XMLForm':'element', 'import_typecast':AvcContext },
'tclass' : {'XMLForm':'element', },
'tpath' : {'XMLForm':'element' },
'port' : {'XMLForm':'element', 'import_typecast':int, },
}
def __init__(self, **kwds):
super(SEFaultSignature, self).__init__()
for k,v in kwds.items():
setattr(self, k, v)
class SEFaultSignatureInfo(XmlSerialize):
_xml_info = {
'analysis_id' : {'XMLForm':'element', },
'audit_event' : {'XMLForm':'element', 'import_typecast':AuditEvent },
'source' : {'XMLForm':'element' },
'spath' : {'XMLForm':'element' },
'tpath' : {'XMLForm':'element' },
'src_rpm_list' : {'XMLForm':'element', 'list':'rpm', },
'tgt_rpm_list' : {'XMLForm':'element', 'list':'rpm', },
'scontext' : {'XMLForm':'element', 'import_typecast':AvcContext },
'tcontext' : {'XMLForm':'element', 'import_typecast':AvcContext },
'tclass' : {'XMLForm':'element', },
'port' : {'XMLForm':'element', 'import_typecast':int, },
'host' : {'XMLForm':'element', },
'sig' : {'XMLForm':'element', 'import_typecast':SEFaultSignature },
'solution' : {'XMLForm':'element', 'import_typecast':SEFaultSolution },
'category' : {'XMLForm':'element' },
'environment' : {'XMLForm':'element', 'import_typecast':SEEnvironment },
'line_numbers' : {'XMLForm':'element', 'list':'line', 'import_typecast':int, },
'first_seen_date' : {'XMLForm':'element', 'import_typecast':TimeStamp },
'last_seen_date' : {'XMLForm':'element', 'import_typecast':TimeStamp },
'report_count' : {'XMLForm':'element', 'import_typecast':int, 'default':lambda: 0 },
'local_id' : {'XMLForm':'element' },
'users' : {'XMLForm':'element', 'list':'user', 'import_typecast':SEFaultSignatureUser, },
}
merge_include = ['audit_event', 'tpath', 'src_rpm_list', 'tgt_rpm_list',
'scontext', 'tcontext', 'tclass', 'port',
'solution', 'category', 'environment',
'last_seen_date'
]
def __init__(self, **kwds):
super(SEFaultSignatureInfo, self).__init__()
for k,v in kwds.items():
setattr(self, k, v)
def update_merge(self, siginfo):
for name in self.merge_include:
setattr(self, name, getattr(siginfo, name))
self.line_numbers = merge_lists(self.line_numbers, siginfo.line_numbers)
def get_user_data(self, username):
for user in self.users:
if user.username == username:
return user
if debug:
log_sig.debug("new SEFaultSignatureUser for %s", username)
user = SEFaultSignatureUser(username)
self.users.append(user)
return user
def find_filter_by_username(self, username):
if debug:
log_sig.debug("find_filter_by_username %s", username)
filter = None
user_data = self.get_user_data(username)
if user_data is not None:
filter = user_data.filter
return filter
def update_user_filter(self, username, filter_type, data=None):
user_data = self.get_user_data(username)
user_data.update_filter(filter_type, data)
def evaluate_filter_for_user(self, username, filter_type=None):
action = 'display'
f = self.find_filter_by_username(username)
if debug:
log_rpc.debug("evaluate_filter_for_user: found %s user's filter = %s", username, f)
if f is not None:
if filter_type is not None:
f.filter_type = filter_type
action = self.evaluate_filter(f)
if debug:
log_alert.debug("evaluate_filter_for_user: found filter for %s: %s\n%s",
username, action, f)
return action
def evaluate_filter(self, filter):
filter_type = filter.filter_type
action = 'display'
if filter_type == FILTER_NEVER:
action = 'display'
elif filter_type == FILTER_AFTER_FIRST:
if filter.count == 0:
action = 'display'
else:
action = 'ignore'
elif filter_type == FILTER_ALWAYS:
action = 'ignore'
else:
raise ValueError("unknown filter_type (%s)" % (filter_type))
filter.count += 1
return action
def format_rpm_list(self, rpm_list):
if isinstance(rpm_list, list):
if len(rpm_list) > 0:
return " ".join(rpm_list)
else:
return ""
else:
return default_text(None)
def format_target_object(self):
return "%s [ %s ]" % (self.tpath, self.tclass)
def description_adjusted_for_permissive(self):
permissive_msg = _("SELinux is in permissive mode, the operation would have been denied but was permitted due to permissive mode.")
if self.environment.enforce == 'Permissive':
return '<font color="FF0000">[%s]</font><p>%s' % (permissive_msg, self.solution.problem_description)
else:
return self.solution.problem_description
def format_html(self, foreground_color="000000", background_color='#FFFFFF'):
env = self.environment
summary = self.solution.summary
description = self.description_adjusted_for_permissive()
fix = self.solution.fix_description
fixcmd = self.solution.fix_cmd
if self.line_numbers is None:
line_numbers = None
else:
self.line_numbers.sort()
line_numbers = ', '.join([str(x) for x in self.line_numbers])
tr1_fmt = '<tr bgcolor="%s"><td><font color="%s">%%s</font></td></tr>\n' % \
(foreground_color, background_color)
tr2_fmt = '<tr><td><font color="%s">%%s</font></td></tr>\n' % \
(foreground_color)
p_fmt = '<p>%s\n'
if fixcmd:
fix += '<br><br>%s<pre>%s</pre>' % (_("The following command will allow this access:"), fixcmd)
html = ''
# Wrap entire alert in one table
html += '<table bgcolor=%s><tr><td>\n' % (background_color)
# 1st table: primary Information
html += '<table width="100%" cellspacing="1" cellpadding="1">\n'
# Note: The following string items returned by a plugin
# already have had their template substitutions HTML escaped
# when the plugin generated it's report
html += tr1_fmt % (_("Summary"))
html += tr2_fmt % (summary)
html += tr1_fmt % (_("Detailed Description"))
html += tr2_fmt % (description)
html += tr1_fmt % (_("Allowing Access"))
html += tr2_fmt % (fix)
html += tr1_fmt % (_("Additional Information"))
html += tr2_fmt % ('')
html += '</table>\n'
# 2nd table: supplementary information
tr1_fmt = '<tr><td><font color="%s">%%s: </td><td>%%s</font></td></tr>\n' % \
(foreground_color)
html += '<table border="0" cellspacing="1" cellpadding="1">\n'
html += tr1_fmt % (_("Source Context"), escape_html(self.scontext.format()))
html += tr1_fmt % (_("Target Context"), escape_html(self.tcontext.format()))
html += tr1_fmt % (_("Target Objects"), escape_html(self.format_target_object()))
html += tr1_fmt % (_("Source"), escape_html(default_text(self.source)))
html += tr1_fmt % (_("Source Path"), escape_html(default_text(self.spath)))
html += tr1_fmt % (_("Port"), escape_html(default_text(self.port)))
html += tr1_fmt % (_("Host"), escape_html(default_text(self.host)))
html += tr1_fmt % (_("Source RPM Packages"), escape_html(default_text(self.format_rpm_list(self.src_rpm_list))))
html += tr1_fmt % (_("Target RPM Packages"), escape_html(default_text(self.format_rpm_list(self.tgt_rpm_list))))
html += tr1_fmt % (_("Policy RPM"), escape_html(default_text(env.policy_rpm)))
html += tr1_fmt % (_("Selinux Enabled"), escape_html(default_text(env.selinux_enabled)))
html += tr1_fmt % (_("Policy Type"), escape_html(default_text(env.policy_type)))
html += tr1_fmt % (_("MLS Enabled"), escape_html(default_text(env.selinux_mls_enabled)))
html += tr1_fmt % (_("Enforcing Mode"), escape_html(default_text(env.enforce)))
html += tr1_fmt % (_("Plugin Name"), escape_html(default_text(self.sig.analysis_id)))
html += tr1_fmt % (_("Host Name"), escape_html(default_text(env.hostname)))
html += tr1_fmt % (_("Platform"), escape_html(default_text(env.uname)))
html += tr1_fmt % (_("Alert Count"), escape_html(default_text(self.report_count)))
html += tr1_fmt % (_("First Seen"), escape_html(default_date_text(self.first_seen_date)))
html += tr1_fmt % (_("Last Seen"), escape_html(default_date_text(self.last_seen_date)))
html += tr1_fmt % (_("Local ID"), escape_html(default_text(self.local_id)))
html += tr1_fmt % (_("Line Numbers"), escape_html(default_text(line_numbers)))
html += '</table>'
html += p_fmt % _("Raw Audit Messages") + ':<br>'
html += '<br>'
for audit_record in self.audit_event.records:
html += escape_html(str(audit_record))
html += '<br>'
# close the entire encapsultating table
html += '</td></tr></table>\n'
return html
def format_text(self):
env = self.environment
summary = self.solution.summary
description = self.description_adjusted_for_permissive()
fix = self.solution.fix_description
fixcmd = self.solution.fix_cmd
if self.line_numbers is None:
line_numbers = None
else:
self.line_numbers.sort()
line_numbers = ', '.join([str(x) for x in self.line_numbers])
line_numbers = html_to_text(line_numbers) # not really html, but this will wrap the text
text = ''
text += html_to_text('<h1>'+_("Summary")+':</h1>'+summary)
text += html_to_text('<p>')
text += html_to_text('<h1>'+_("Detailed Description")+':</h1>'+description)
text += html_to_text('<p>')
text += html_to_text('<h1>'+_("Allowing Access")+':</h1>'+fix)
if fixcmd:
text += html_to_text('<p>')
text += html_to_text('<h1>'+_("The following command will allow this access:")+'</h1><p>')
text += fixcmd
text += html_to_text('<p>')
text += html_to_text('<h1>'+_("Additional Information")+':</h1>')
text += format_2_column_name_value(_("Source Context"), self.scontext.format())
text += format_2_column_name_value(_("Target Context"), self.tcontext.format())
text += format_2_column_name_value(_("Target Objects"), self.format_target_object())
text += format_2_column_name_value(_("Source"), default_text(self.source))
text += format_2_column_name_value(_("Source Path"), default_text(self.spath))
text += format_2_column_name_value(_("Port"), default_text(self.port))
text += format_2_column_name_value(_("Host"), default_text(self.host))
text += format_2_column_name_value(_("Source RPM Packages"), default_text(self.format_rpm_list(self.src_rpm_list)))
text += format_2_column_name_value(_("Target RPM Packages"), default_text(self.format_rpm_list(self.tgt_rpm_list)))
text += format_2_column_name_value(_("Policy RPM"), default_text(env.policy_rpm))
text += format_2_column_name_value(_("Selinux Enabled"), default_text(env.selinux_enabled))
text += format_2_column_name_value(_("Policy Type"), default_text(env.policy_type))
text += format_2_column_name_value(_("MLS Enabled"), default_text(env.selinux_mls_enabled))
text += format_2_column_name_value(_("Enforcing Mode"), default_text(env.enforce))
text += format_2_column_name_value(_("Plugin Name"), default_text(self.sig.analysis_id))
text += format_2_column_name_value(_("Host Name"), default_text(env.hostname))
text += format_2_column_name_value(_("Platform"), default_text(env.uname))
text += format_2_column_name_value(_("Alert Count"), default_text(self.report_count))
text += format_2_column_name_value(_("First Seen"), default_date_text(self.first_seen_date))
text += format_2_column_name_value(_("Last Seen"), default_date_text(self.last_seen_date))
text += format_2_column_name_value(_("Local ID"), default_text(self.local_id))
text += format_2_column_name_value(_("Line Numbers"), default_text(line_numbers))
text += '\n'
text += format_2_column_name_value(_("Raw Audit Messages"), '\n')
for audit_record in self.audit_event.records:
text += str(audit_record)
text += '\n'
text += '\n'
return text
class SEFaultUserInfo(XmlSerialize):
_xml_info = {
'version' : {'XMLForm':'attribute','default':lambda: '1.0' },
'username' : {'XMLForm':'attribute' },
'email_alert' : {'XMLForm':'element', 'import_typecast':boolean, 'default': lambda: False },
'email_address_list' : {'XMLForm':'element', 'list':'email_address', },
}
def __init__(self, username):
super(SEFaultUserInfo, self).__init__()
self.username = username
def add_email_address(self, email_address):
if not email_address in self.email_address_list:
self.email_address_list.append(email_address)
class SEFaultUserSet(XmlSerialize):
_xml_info = {
'version' : {'XMLForm':'attribute','default':lambda: '1.0' },
'user_list' : {'XMLForm':'element', 'list':'user', 'import_typecast':SEFaultUserInfo, },
}
def __init__(self):
super(SEFaultUserSet, self).__init__()
def get_user(self, username):
for user in self.user_list:
if username == user.username:
return user
return None
def add_user(self, username):
if self.get_user(username) is not None:
return
user = SEFaultUserInfo(username)
self.user_list.append(user)
return user
class SEFaultSignatureSet(XmlSerialize):
_xml_info = {
'version' : {'XMLForm':'attribute','default':lambda: '%d.%d' % (DATABASE_MAJOR_VERSION, DATABASE_MINOR_VERSION)},
'users' : {'XMLForm':'element', 'import_typecast':SEFaultUserSet, 'default': lambda: SEFaultUserSet() },
'signature_list' : {'XMLForm':'element', 'list':'siginfo', 'import_typecast':SEFaultSignatureInfo, },
}
def __init__(self):
super(SEFaultSignatureSet, self).__init__()
def siginfos(self):
for siginfo in self.signature_list:
yield siginfo
def add_siginfo(self, siginfo):
self.signature_list.append(siginfo)
return siginfo
def remove_siginfo(self, siginfo):
self.signature_list.remove(siginfo)
def clear(self):
self.signature_list = []
def generate_local_id(self):
return str(uuid.uuid4())
def lookup_local_id(self, local_id):
if local_id is None:
return None
for siginfo in self.signature_list:
if siginfo.local_id == local_id:
return siginfo
return None
def match_signatures(self, pat, criteria='exact', xml_info=SEFaultSignature._xml_info):
match_targets = xml_info.keys()
exact = False
if criteria == 'exact':
exact = True
elif type(criteria) is FloatType:
num_match_targets = len(match_targets)
score_per_match_target = 1.0 / num_match_targets
else:
raise ValueError("unknown criteria = %s" % criteria)
matches = []
for siginfo in self.signature_list:
score = 0.0
sig = siginfo.sig
for name in match_targets:
if getattr(pat, name) == getattr(sig, name):
if exact:
score = 1.0
else:
score += score_per_match_target
else:
if exact:
score = 0.0
break
if exact:
if score == 1.0:
matches.append(SignatureMatch(siginfo, score))
else:
if score >= criteria:
matches.append(SignatureMatch(siginfo, score))
matches.sort((lambda a,b: cmp(b.score, a.score)))
return matches
class SEDatabaseProperties(XmlSerialize):
_xml_info = {
'name' : {'XMLForm':'element' },
'friendly_name' : {'XMLForm':'element' },
'filepath' : {'XMLForm':'element' },
}
def __init__(self, name=None, friendly_name=None, filepath=None):
super(SEDatabaseProperties, self).__init__()
if name is not None:
self.name = name
if friendly_name is not None:
self.friendly_name = friendly_name
if filepath is not None:
self.filepath = filepath
class SEEmailRecipient(XmlSerialize):
_xml_info = {
'address' : {'XMLForm':'element' },
'filter_type' : {'XMLForm':'element', 'import_typecast':int, 'default':lambda: FILTER_AFTER_FIRST },
}
def __init__(self, address, filter_type=None):
super(SEEmailRecipient, self).__init__()
self.address = address
if filter_type is not None:
self.filter_type = filter_type
def __str__(self):
return "%s:%s" % (self.address, map_filter_value_to_name.get(self.filter_type, 'unknown'))
class SEEmailRecipientSet(XmlSerialize):
_xml_info = {
'version' : {'XMLForm':'attribute','default':lambda: '1' },
'recipient_list' : {'XMLForm':'element', 'list':'recipient', 'import_typecast':SEEmailRecipient, },
}
def __init__(self, recipient_list=None):
super(SEEmailRecipientSet, self).__init__()
if recipient_list is not None:
self.recipient_list = recipient_list
def __str__(self):
return ','.join([str(x) for x in self.recipient_list])
def find_address(self, address):
address = address.strip()
for recipient in self.recipient_list:
if address == recipient.address:
return recipient
return None
def add_address(self, address, filter_type=FILTER_AFTER_FIRST):
address = address.strip()
if not valid_email_address(address):
raise ProgramError(ERR_INVALID_EMAIL_ADDR, detail="address='%s'" % address)
return
recipient = self.find_address(address)
if recipient is not None:
return
self.recipient_list.append(SEEmailRecipient(address, filter_type))
def clear_recipient_list(self):
self.recipient_list = []
def parse_recipient_file(self, filepath):
comment_re = re.compile('#.*')
entry_re = re.compile('(\S+)(\s+(.+))?')
key_value_re = re.compile("(\w+)\s*=\s*(\S+)")
map_boolean = {'enabled' : True,
'true' : True,
'yes' : True,
'on' : True,
'disabled' : False,
'false' : False,
'no' : False,
'off' : False,
}
try:
f = open(filepath)
except IOError, e:
raise ProgramError(ERR_FILE_OPEN, detail="%s, %s" % (filepath, e.strerror))
self.clear_recipient_list()
for line in f.readlines():
line = comment_re.sub('', line)
line = line.strip()
if line:
match = entry_re.search(line)
if match:
address = match.group(1)
options = match.group(3)
filter_type = None
if options:
for match in key_value_re.finditer(options):
option = match.group(1)
value = match.group(2)
if option == 'filter_type':
filter_type = map_filter_name_to_value.get(value.lower(), None)
if filter_type is None:
log_email.warn("unknown email filter (%s) for address %s", option, address)
else:
log_email.warn("unknown email option (%s) for address %s", option, address)
try:
self.add_address(address, filter_type)
except ProgramError, e:
if e.errno == ERR_INVALID_EMAIL_ADDR:
log_email.warn(e.strerror)
else:
raise e
f.close()
def write_recipient_file(self, filepath):
try:
f = open(filepath, 'w')
except IOError, e:
raise ProgramError(ERR_FILE_OPEN, detail="%s, %s" % (filepath, e.strerror))
for recipient in self.recipient_list:
filter_type = map_filter_value_to_name[recipient.filter_type]
f.write("%-40s filter_type=%s\n" % (recipient.address, filter_type))
f.close()
#------------------------------------------------------------------------
if __name__ == '__main__':
import libxml2
#memory debug specific
libxml2.debugMemory(1)
xml_file = 'audit_listener_database.xml'
sigs = SEFaultSignatureSet()
sigs.read_xml_file(xml_file, 'sigs')
siginfo = sigs.signature_list[0]
record = siginfo.audit_event.records[0]
print record.record_type
print "siginfo.audit_event=%s" % siginfo.audit_event
print sigs
#memory debug specific
libxml2.cleanupParser()
if libxml2.debugMemory(1) == 0:
print "Memory OK"
else:
print "Memory leak %d bytes" % (libxml2.debugMemory(1))
libxml2.dumpMemory()