diff --git a/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/__init__.py b/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/__init__.py index e69de29..02b1135 100644 --- a/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/__init__.py +++ b/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/__init__.py @@ -0,0 +1,55 @@ +# vim: set fileencoding=utf-8 et sw=4 ts=4 : + +ARPA_IP4 = '.in-addr.arpa' +ARPA_IP6 = '.ip6.arpa' + +def makeContactPerson(obj, arg): + """Create contact Email-address for domain.""" + domain = obj.position.getDomain() + return 'root@%s.' % (domain.replace('dc=', '').replace(',','.'),) + +def unescapeSOAemail(email): + r""" + Un-escape Email-address from DNS SOA record. + >>> unescapeSOAemail(r'first\.last.domain.tld') + 'first.last@domain.tld' + """ + ret = '' + i = 0 + while i < len(email): + if email[i] == '\\': + i += 1 + if i >= len(email): + raise ValueError() + elif email[i] == '.': + i += 1 + if i >= len(email): + raise ValueError() + ret += '@' + ret += email[i:] + return ret + ret += email[i] + i += 1 + raise ValueError() + +def escapeSOAemail(email): + r""" + Escape Email-address for DNS SOA record. + >>> escapeSOAemail('first.last@domain.tld') + 'first\\.last.domain.tld' + """ + SPECIAL_CHARACTERS = set('"(),.:;<>@[\\]') + if not '@' in email: + raise ValueError() + (local, domain) = email.rsplit('@', 1) + tmp = '' + for c in local: + if c in SPECIAL_CHARACTERS: + tmp += '\\' + tmp += c + local = tmp + return local + '.' + domain + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/forward_zone.py b/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/forward_zone.py index 4acf363..d9ed504 100644 --- a/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/forward_zone.py +++ b/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/forward_zone.py @@ -30,22 +30,17 @@ # /usr/share/common-licenses/AGPL-3; if not, see # . -import re - from univention.admin.layout import Tab, Group from univention.admin import configRegistry import univention.admin.filter import univention.admin.handlers import univention.admin.localization +from univention.admin.handlers.dns import ARPA_IP4, ARPA_IP6, escapeSOAemail, unescapeSOAemail translation=univention.admin.localization.translation('univention.admin.handlers.dns') _=translation.translate -def makeContactPerson(object, arg): - domain=object.position.getDomain() - return 'root@%s.' %(domain.replace(',dc=','.').replace('dc=','')) - module='dns/forward_zone' operations=['add','edit','remove','search'] usewizard=1 @@ -252,38 +247,6 @@ class object(univention.admin.handlers.simpleLdap): univention.admin.handlers.simpleLdap.__init__(self, co, lo, position, dn, superordinate, attributes = attributes ) - def unescapeSOAemail(self, email): - ret = '' - i = 0 - while i < len(email): - if email[i] == '\\': - i += 1 - if i >= len(email): - raise ValueError() - elif email[i] == '.': - i += 1 - if i >= len(email): - raise ValueError() - ret += '@' - ret += email[i:] - return ret - ret += email[i] - i += 1 - raise ValueError() - - def escapeSOAemail(self, email): - SPECIAL_CHARACTERS = set('"(),.:;<>@[\\]') - if not '@' in email: - raise ValueError() - (local, domain, ) = email.rsplit('@', 1) - tmp = '' - for c in local: - if c in SPECIAL_CHARACTERS: - tmp += '\\' - tmp += c - local = tmp - return local + '.' + domain - def open(self): univention.admin.handlers.simpleLdap.open(self) self.info['a'] = [] @@ -294,8 +257,8 @@ class object(univention.admin.handlers.simpleLdap): soa=self.oldattr.get('sOARecord',[''])[0].split(' ') if len(soa) > 6: - self['contact']=self.unescapeSOAemail(soa[1]) - self['serial']=soa[2] + self['contact'] = unescapeSOAemail(soa[1]) + self['serial'] = soa[2] self['refresh'] = univention.admin.mapping.unmapUNIX_TimeInterval( soa[3] ) self['retry'] = univention.admin.mapping.unmapUNIX_TimeInterval( soa[4] ) self['expire'] = univention.admin.mapping.unmapUNIX_TimeInterval( soa[5] ) @@ -316,17 +279,17 @@ class object(univention.admin.handlers.simpleLdap): ml=univention.admin.handlers.simpleLdap._ldap_modlist(self) if self.hasChanged(['nameserver', 'contact', 'serial', 'refresh', 'retry', 'expire', 'ttl']): if self['contact'] and not self['contact'].endswith('.'): - self['contact'] = '%s.' % self['contact'] + self['contact'] += '.' if len (self['nameserver'][0]) > 0 \ - and self['nameserver'][0].find (':') == -1 \ - and self['nameserver'][0].find ('.') != -1 \ - and not self['nameserver'][0][-1] == '.': - self['nameserver'][0] = '%s.' % self['nameserver'][0] + and ':' not in self['nameserver'][0] \ + and '.' in self['nameserver'][0] \ + and not self['nameserver'][0].endswith('.'): + self['nameserver'][0] += '.' refresh = univention.admin.mapping.mapUNIX_TimeInterval( self[ 'refresh' ] ) retry = univention.admin.mapping.mapUNIX_TimeInterval( self[ 'retry' ] ) expire = univention.admin.mapping.mapUNIX_TimeInterval( self[ 'expire' ] ) ttl = univention.admin.mapping.mapUNIX_TimeInterval( self[ 'ttl' ] ) - soa='%s %s %s %s %s %s %s' % (self['nameserver'][0], self.escapeSOAemail(self['contact']), self['serial'], refresh, retry, expire, ttl ) + soa='%s %s %s %s %s %s %s' % (self['nameserver'][0], escapeSOAemail(self['contact']), self['serial'], refresh, retry, expire, ttl ) ml.append(('sOARecord', self.oldattr.get('sOARecord', []), [soa])) oldAddresses = self.oldinfo.get('a') @@ -362,8 +325,8 @@ def lookup(co, lo, filter_s, base='', superordinate=None, scope='sub', unique=0, filter=univention.admin.filter.conjunction('&', [ univention.admin.filter.expression('objectClass', 'dNSZone'), univention.admin.filter.expression('relativeDomainName', '@'), - univention.admin.filter.conjunction('!', [univention.admin.filter.expression('zoneName', '*.in-addr.arpa')]), - univention.admin.filter.conjunction('!', [univention.admin.filter.expression('zoneName', '*.ip6.arpa')]), + univention.admin.filter.conjunction('!', [univention.admin.filter.expression('zoneName', '*%s' % ARPA_IP4)]), + univention.admin.filter.conjunction('!', [univention.admin.filter.expression('zoneName', '*%s' % ARPA_IP6)]), ]) if filter_s: @@ -376,6 +339,5 @@ def lookup(co, lo, filter_s, base='', superordinate=None, scope='sub', unique=0, res.append((object(co, lo, None, dn=dn, superordinate=superordinate, attributes = attrs ))) return res - def identify(dn, attr, canonical=0): - return 'dNSZone' in attr.get('objectClass', []) and ['@'] == attr.get('relativeDomainName', []) and not attr['zoneName'][0].endswith('.in-addr.arpa') and not attr['zoneName'][0].endswith('.ip6.arpa') + return 'dNSZone' in attr.get('objectClass', []) and ['@'] == attr.get('relativeDomainName', []) and not attr['zoneName'][0].endswith(ARPA_IP4) and not attr['zoneName'][0].endswith(ARPA_IP6) diff --git a/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/reverse_zone.py b/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/reverse_zone.py index cf387b9..4494ef2 100644 --- a/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/reverse_zone.py +++ b/branches/ucs-3.0/ucs/management/univention-directory-manager-modules/modules/univention/admin/handlers/dns/reverse_zone.py @@ -30,15 +30,13 @@ # /usr/share/common-licenses/AGPL-3; if not, see # . -import string -import types - from univention.admin.layout import Tab, Group from univention.admin import configRegistry import univention.admin.filter import univention.admin.handlers import univention.admin.localization +from univention.admin.handlers.dns import ARPA_IP4, ARPA_IP6, escapeSOAemail, unescapeSOAemail translation=univention.admin.localization.translation('univention.admin.handlers.dns') _=translation.translate @@ -170,24 +168,41 @@ layout = [ ] def mapSubnet(subnet): + """ + Map subnet to reverse zone. + >>> mapSubnet('0123:4567:89ab:cdef') + 'f.e.d.c.b.a.9.8.7.6.5.4.3.2.1.0.ip6.arpa' + >>> mapSubnet('1.2.3') + '3.2.1.in-addr.arpa' + """ if ':' in subnet: # IPv6 - return '.'.join(reversed(subnet.replace(':', ''))) + '.ip6.arpa' + return '%s%s' % ('.'.join(reversed(subnet.replace(':', ''))), ARPA_IP6) else: q=subnet.split('.') q.reverse() - return string.join(q, '.')+'.in-addr.arpa' + return '%s%s' % ('.'.join(q), ARPA_IP4) def unmapSubnet(zone): - if type(zone) == types.ListType: + """ + Map reverse zone to subnet. + >>> unmapSubnet('f.e.d.c.b.a.9.8.7.6.5.4.3.2.1.0.ip6.arpa') + '0123:4567:89ab:cdef' + >>> unmapSubnet('3.2.1.in-addr.arpa') + '1.2.3' + """ + if isinstance(zone, list): zone=zone[0] - if '.ip6.arpa' in zone: # IPv6 - zone = list(reversed(zone.replace('.ip6.arpa', '').split('.'))) + if zone.endswith(ARPA_IP6): # IPv6 + zone = zone[:-len(ARPA_IP6)] + zone = list(reversed(zone.split('.'))) return ':'.join([''.join(zone[i:i+4]) for i in xrange(0, len(zone), 4)]) - else: - zone=zone.replace('.in-addr.arpa', '') + elif zone.endswith(ARPA_IP4): # IPv4 + zone = zone[:-len(ARPA_IP4)] q=zone.split('.') q.reverse() - return string.join(q, '.') + return '.'.join(q) + else: + raise ValueError('Neither an IPv4 nor an IPv& address') mapping=univention.admin.mapping.mapping() mapping.register('subnet', 'zoneName', mapSubnet, unmapSubnet) @@ -214,7 +229,7 @@ class object(univention.admin.handlers.simpleLdap): soa=self.oldattr.get('sOARecord',[''])[0].split(' ') if len(soa) > 6: - self['contact']=soa[1].replace('.','@',1) + self['contact'] = unescapeSOAemail(soa[1]) self['serial'] = soa[2] self['refresh'] = univention.admin.mapping.unmapUNIX_TimeInterval( soa[3] ) self['retry'] = univention.admin.mapping.unmapUNIX_TimeInterval( soa[4] ) @@ -229,12 +244,18 @@ class object(univention.admin.handlers.simpleLdap): def _ldap_modlist(self): ml=univention.admin.handlers.simpleLdap._ldap_modlist(self) if self.hasChanged(['nameserver', 'contact', 'serial', 'refresh', 'retry', 'expire', 'ttl']): - + if self['contact'] and not self['contact'].endswith('.'): + self['contact'] += '.' + if len(self['nameserver'][0]) > 0 \ + and ':' not in self['nameserver'][0] \ + and '.' in self['nameserver'][0] \ + and not self['nameserver'][0].endswith('.'): + self['nameserver'][0] += '.' refresh = univention.admin.mapping.mapUNIX_TimeInterval( self[ 'refresh' ] ) retry = univention.admin.mapping.mapUNIX_TimeInterval( self[ 'retry' ] ) expire = univention.admin.mapping.mapUNIX_TimeInterval( self[ 'expire' ] ) ttl = univention.admin.mapping.mapUNIX_TimeInterval( self[ 'ttl' ] ) - soa = '%s %s %s %s %s %s %s' % ( self[ 'nameserver' ][ 0 ], self[ 'contact' ].replace( '@', '.', 1 ), self[ 'serial' ], refresh, retry, expire, ttl ) + soa = '%s %s %s %s %s %s %s' % (self['nameserver'][0], escapeSOAemail(self['contact']), self['serial'], refresh, retry, expire, ttl) ml.append(('sOARecord', self.oldattr.get('sOARecord', []), soa)) return ml @@ -266,8 +287,8 @@ def lookup(co, lo, filter_s, base='', superordinate=None, scope='sub', unique=0, univention.admin.filter.expression('objectClass', 'dNSZone'), univention.admin.filter.expression('relativeDomainName', '@'), univention.admin.filter.conjunction('|', [ - univention.admin.filter.expression('zoneName', '*.in-addr.arpa'), - univention.admin.filter.expression('zoneName', '*.ip6.arpa') + univention.admin.filter.expression('zoneName', '*%s' % ARPA_IP4), + univention.admin.filter.expression('zoneName', '*%s' % ARPA_IP6) ]), ]) @@ -282,11 +303,13 @@ def lookup(co, lo, filter_s, base='', superordinate=None, scope='sub', unique=0, return res def identify(dn, attr): - return 'dNSZone' in attr.get('objectClass', []) and\ ['@'] == attr.get('relativeDomainName', []) and\ - (attr['zoneName'][0].endswith('.in-addr.arpa') or attr['zoneName'][0].endswith('.ip6.arpa')) + (attr['zoneName'][0].endswith(ARPA_IP4) or attr['zoneName'][0].endswith(ARPA_IP6)) def quickDescription(rdn): - return unmapSubnet(rdn) + +if __name__ == '__main__': + import doctest + doctest.testmod()