python:gp: Do not print an error if template already exists
[janger/samba-autobuild-v4-20-test/.git] / python / samba / gp / gp_cert_auto_enroll_ext.py
1 # gp_cert_auto_enroll_ext samba group policy
2 # Copyright (C) David Mulder <dmulder@suse.com> 2021
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 import os
18 import operator
19 import requests
20 from samba.gp.gpclass import gp_pol_ext, gp_applier, GPOSTATE
21 from samba import Ldb
22 from ldb import SCOPE_SUBTREE, SCOPE_BASE
23 from samba.auth import system_session
24 from samba.gp.gpclass import get_dc_hostname
25 import base64
26 from shutil import which
27 from subprocess import Popen, PIPE
28 import re
29 import json
30 from samba.gp.util.logging import log
31 import struct
32 try:
33     from cryptography.hazmat.primitives.serialization.pkcs7 import \
34         load_der_pkcs7_certificates
35 except ModuleNotFoundError:
36     def load_der_pkcs7_certificates(x): return []
37     log.error('python cryptography missing pkcs7 support. '
38               'Certificate chain parsing will fail')
39 from cryptography.hazmat.primitives.serialization import Encoding
40 from cryptography.x509 import load_der_x509_certificate
41 from cryptography.hazmat.backends import default_backend
42 from samba.common import get_string
43
44 cert_wrap = b"""
45 -----BEGIN CERTIFICATE-----
46 %s
47 -----END CERTIFICATE-----"""
48 endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \
49               '_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP'
50
51 global_trust_dirs = ['/etc/pki/trust/anchors',           # SUSE
52                      '/etc/pki/ca-trust/source/anchors', # RHEL/Fedora
53                      '/usr/local/share/ca-certificates'] # Debian/Ubuntu
54
55 def octet_string_to_objectGUID(data):
56     """Convert an octet string to an objectGUID."""
57     return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0],
58                                '%02x' % struct.unpack('<H', data[4:6])[0],
59                                '%02x' % struct.unpack('<H', data[6:8])[0],
60                                '%02x' % struct.unpack('>H', data[8:10])[0],
61                                '%02x%02x' % struct.unpack('>HL', data[10:]))
62
63
64 def group_and_sort_end_point_information(end_point_information):
65     """Group and Sort End Point Information.
66
67     [MS-CAESO] 4.4.5.3.2.3
68     In this step autoenrollment processes the end point information by grouping
69     it by CEP ID and sorting in the order with which it will use the end point
70     to access the CEP information.
71     """
72     # Create groups of the CertificateEnrollmentPolicyEndPoint instances that
73     # have the same value of the EndPoint.PolicyID datum.
74     end_point_groups = {}
75     for e in end_point_information:
76         if e['PolicyID'] not in end_point_groups.keys():
77             end_point_groups[e['PolicyID']] = []
78         end_point_groups[e['PolicyID']].append(e)
79
80     # Sort each group by following these rules:
81     for end_point_group in end_point_groups.values():
82         # Sort the CertificateEnrollmentPolicyEndPoint instances in ascending
83         # order based on the EndPoint.Cost value.
84         end_point_group.sort(key=lambda e: e['Cost'])
85
86         # For instances that have the same EndPoint.Cost:
87         cost_list = [e['Cost'] for e in end_point_group]
88         costs = set(cost_list)
89         for cost in costs:
90             i = cost_list.index(cost)
91             j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1
92             if i == j:
93                 continue
94
95             # Sort those that have EndPoint.Authentication equal to Kerberos
96             # first. Then sort those that have EndPoint.Authentication equal to
97             # Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint
98             # instances follow in an arbitrary order.
99             def sort_auth(e):
100                 # 0x2 - Kerberos
101                 if e['AuthFlags'] == 0x2:
102                     return 0
103                 # 0x1 - Anonymous
104                 elif e['AuthFlags'] == 0x1:
105                     return 1
106                 else:
107                     return 2
108             end_point_group[i:j+1] = sorted(end_point_group[i:j+1],
109                                             key=sort_auth)
110     return list(end_point_groups.values())
111
112 def obtain_end_point_information(entries):
113     """Obtain End Point Information.
114
115     [MS-CAESO] 4.4.5.3.2.2
116     In this step autoenrollment initializes the
117     CertificateEnrollmentPolicyEndPoints table.
118     """
119     end_point_information = {}
120     section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\'
121     for e in entries:
122         if not e.keyname.startswith(section):
123             continue
124         name = e.keyname.replace(section, '')
125         if name not in end_point_information.keys():
126             end_point_information[name] = {}
127         end_point_information[name][e.valuename] = e.data
128     for ca in end_point_information.values():
129         m = re.match(endpoint_re, ca['URL'])
130         if m:
131             name = '%s-CA' % m.group('server').replace('.', '-')
132             ca['name'] = name
133             ca['hostname'] = m.group('server')
134             ca['auth'] = m.group('auth')
135         elif ca['URL'].lower() != 'ldap:':
136             edata = { 'endpoint': ca['URL'] }
137             log.error('Failed to parse the endpoint', edata)
138             return {}
139     end_point_information = \
140         group_and_sort_end_point_information(end_point_information.values())
141     return end_point_information
142
143 def fetch_certification_authorities(ldb):
144     """Initialize CAs.
145
146     [MS-CAESO] 4.4.5.3.1.2
147     """
148     result = []
149     basedn = ldb.get_default_basedn()
150     # Autoenrollment MUST do an LDAP search for the CA information
151     # (pKIEnrollmentService) objects under the following container:
152     dn = 'CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
153     attrs = ['cACertificate', 'cn', 'dNSHostName']
154     expr = '(objectClass=pKIEnrollmentService)'
155     res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
156     if len(res) == 0:
157         return result
158     for es in res:
159         data = { 'name': get_string(es['cn'][0]),
160                  'hostname': get_string(es['dNSHostName'][0]),
161                  'cACertificate': get_string(base64.b64encode(es['cACertificate'][0]))
162                }
163         result.append(data)
164     return result
165
166 def fetch_template_attrs(ldb, name, attrs=None):
167     if attrs is None:
168         attrs = ['msPKI-Minimal-Key-Size']
169     basedn = ldb.get_default_basedn()
170     dn = 'CN=Certificate Templates,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
171     expr = '(cn=%s)' % name
172     res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
173     if len(res) == 1 and 'msPKI-Minimal-Key-Size' in res[0]:
174         return dict(res[0])
175     else:
176         return {'msPKI-Minimal-Key-Size': ['2048']}
177
178 def format_root_cert(cert):
179     return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert.encode(), 0, re.DOTALL)
180
181 def find_cepces_submit():
182     certmonger_dirs = [os.environ.get("PATH"), '/usr/lib/certmonger',
183                        '/usr/libexec/certmonger']
184     return which('cepces-submit', path=':'.join(certmonger_dirs))
185
186 def get_supported_templates(server):
187     cepces_submit = find_cepces_submit()
188     if not cepces_submit:
189         log.error('Failed to find cepces-submit')
190         return []
191
192     env = os.environ
193     env['CERTMONGER_OPERATION'] = 'GET-SUPPORTED-TEMPLATES'
194     p = Popen([cepces_submit, '--server=%s' % server, '--auth=Kerberos'],
195               env=env, stdout=PIPE, stderr=PIPE)
196     out, err = p.communicate()
197     if p.returncode != 0:
198         data = {'Error': err.decode()}
199         log.error('Failed to fetch the list of supported templates.', data)
200     return out.strip().split()
201
202
203 def getca(ca, url, trust_dir):
204     """Fetch Certificate Chain from the CA."""
205     root_cert = os.path.join(trust_dir, '%s.crt' % ca['name'])
206     root_certs = []
207
208     try:
209         r = requests.get(url=url, params={'operation': 'GetCACert',
210                                           'message': 'CAIdentifier'})
211     except requests.exceptions.ConnectionError:
212         log.warn('Failed to establish a new connection')
213         r = None
214     if r is None or r.content == b'' or r.headers['Content-Type'] == 'text/html':
215         log.warn('Failed to fetch the root certificate chain.')
216         log.warn('The Network Device Enrollment Service is either not' +
217                  ' installed or not configured.')
218         if 'cACertificate' in ca:
219             log.warn('Installing the server certificate only.')
220             der_certificate = base64.b64decode(ca['cACertificate'])
221             try:
222                 cert = load_der_x509_certificate(der_certificate)
223             except TypeError:
224                 cert = load_der_x509_certificate(der_certificate,
225                                                  default_backend())
226             cert_data = cert.public_bytes(Encoding.PEM)
227             with open(root_cert, 'wb') as w:
228                 w.write(cert_data)
229             root_certs.append(root_cert)
230         return root_certs
231
232     if r.headers['Content-Type'] == 'application/x-x509-ca-cert':
233         # Older versions of load_der_x509_certificate require a backend param
234         try:
235             cert = load_der_x509_certificate(r.content)
236         except TypeError:
237             cert = load_der_x509_certificate(r.content, default_backend())
238         cert_data = cert.public_bytes(Encoding.PEM)
239         with open(root_cert, 'wb') as w:
240             w.write(cert_data)
241         root_certs.append(root_cert)
242     elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert':
243         certs = load_der_pkcs7_certificates(r.content)
244         for i in range(0, len(certs)):
245             cert = certs[i].public_bytes(Encoding.PEM)
246             filename, extension = root_cert.rsplit('.', 1)
247             dest = '%s.%d.%s' % (filename, i, extension)
248             with open(dest, 'wb') as w:
249                 w.write(cert)
250             root_certs.append(dest)
251     else:
252         log.warn('getca: Wrong (or missing) MIME content type')
253
254     return root_certs
255
256
257 def find_global_trust_dir():
258     """Return the global trust dir using known paths from various Linux distros."""
259     for trust_dir in global_trust_dirs:
260         if os.path.isdir(trust_dir):
261             return trust_dir
262     return global_trust_dirs[0]
263
264 def update_ca_command():
265     """Return the command to update the CA trust store."""
266     return which('update-ca-certificates') or which('update-ca-trust')
267
268 def changed(new_data, old_data):
269     """Return True if any key present in both dicts has changed."""
270     return any((new_data[k] != old_data[k] if k in old_data else False)
271             for k in new_data.keys())
272
273 def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
274     """Install the root certificate chain."""
275     data = dict({'files': [], 'templates': []}, **ca)
276     url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
277
278     log.info("Try to get root or server certificates")
279
280     root_certs = getca(ca, url, trust_dir)
281     data['files'].extend(root_certs)
282     global_trust_dir = find_global_trust_dir()
283     for src in root_certs:
284         # Symlink the certs to global trust dir
285         dst = os.path.join(global_trust_dir, os.path.basename(src))
286         try:
287             os.symlink(src, dst)
288             data['files'].append(dst)
289             log.info("Created symlink: %s -> %s" % (src, dst))
290         except PermissionError:
291             log.warn('Failed to symlink root certificate to the'
292                      ' admin trust anchors')
293         except FileNotFoundError:
294             log.warn('Failed to symlink root certificate to the'
295                      ' admin trust anchors.'
296                      ' The directory was not found', global_trust_dir)
297         except FileExistsError:
298             # If we're simply downloading a renewed cert, the symlink
299             # already exists. Ignore the FileExistsError. Preserve the
300             # existing symlink in the unapply data.
301             data['files'].append(dst)
302
303     update = update_ca_command()
304     log.info("Running %s" % (update))
305     if update is not None:
306         ret = Popen([update]).wait()
307         if ret != 0:
308             log.error('Failed to run %s' % (update))
309
310     # Setup Certificate Auto Enrollment
311     getcert = which('getcert')
312     cepces_submit = find_cepces_submit()
313     if getcert is not None and cepces_submit is not None:
314         p = Popen([getcert, 'add-ca', '-c', ca['name'], '-e',
315                   '%s --server=%s --auth=%s' % (cepces_submit,
316                   ca['hostname'], auth)],
317                   stdout=PIPE, stderr=PIPE)
318         out, err = p.communicate()
319         log.debug(out.decode())
320         if p.returncode != 0:
321             if p.returncode == 2:
322                 log.info('The CA [%s] already exists' % ca['name'])
323             else:
324                 data = {'Error': err.decode(), 'CA': ca['name']}
325                 log.error('Failed to add Certificate Authority', data)
326
327         supported_templates = get_supported_templates(ca['hostname'])
328         for template in supported_templates:
329             attrs = fetch_template_attrs(ldb, template)
330             nickname = '%s.%s' % (ca['name'], template.decode())
331             keyfile = os.path.join(private_dir, '%s.key' % nickname)
332             certfile = os.path.join(trust_dir, '%s.crt' % nickname)
333             p = Popen([getcert, 'request', '-c', ca['name'],
334                        '-T', template.decode(),
335                        '-I', nickname, '-k', keyfile, '-f', certfile,
336                        '-g', attrs['msPKI-Minimal-Key-Size'][0]],
337                        stdout=PIPE, stderr=PIPE)
338             out, err = p.communicate()
339             log.debug(out.decode())
340             if p.returncode != 0:
341                 if p.returncode == 2:
342                     log.info('The template [%s] already exists' % (nickname))
343                 else:
344                     data = {'Error': err.decode(), 'Certificate': nickname}
345                     log.error('Failed to request certificate', data)
346
347             data['files'].extend([keyfile, certfile])
348             data['templates'].append(nickname)
349         if update is not None:
350             Popen([update]).wait()
351     else:
352         log.warn('certmonger and cepces must be installed for ' +
353                  'certificate auto enrollment to work')
354     return json.dumps(data)
355
356 class gp_cert_auto_enroll_ext(gp_pol_ext, gp_applier):
357     def __str__(self):
358         return r'Cryptography\AutoEnrollment'
359
360     def unapply(self, guid, attribute, value):
361         ca_cn = base64.b64decode(attribute)
362         data = json.loads(value)
363         getcert = which('getcert')
364         if getcert is not None:
365             Popen([getcert, 'remove-ca', '-c', ca_cn]).wait()
366             for nickname in data['templates']:
367                 Popen([getcert, 'stop-tracking', '-i', nickname]).wait()
368         for f in data['files']:
369             if os.path.exists(f):
370                 if os.path.exists(f):
371                     os.unlink(f)
372         self.cache_remove_attribute(guid, attribute)
373
374     def apply(self, guid, ca, applier_func, *args, **kwargs):
375         attribute = base64.b64encode(ca['name'].encode()).decode()
376         # If the policy has changed, unapply, then apply new policy
377         old_val = self.cache_get_attribute_value(guid, attribute)
378         old_data = json.loads(old_val) if old_val is not None else {}
379         templates = ['%s.%s' % (ca['name'], t.decode()) for t in get_supported_templates(ca['hostname'])] \
380             if old_val is not None else []
381         new_data = { 'templates': templates, **ca }
382         if changed(new_data, old_data) or self.cache_get_apply_state() == GPOSTATE.ENFORCE:
383             self.unapply(guid, attribute, old_val)
384         # If policy is already applied and unchanged, skip application
385         if old_val is not None and not changed(new_data, old_data) and \
386                 self.cache_get_apply_state() != GPOSTATE.ENFORCE:
387             return
388
389         # Apply the policy and log the changes
390         data = applier_func(*args, **kwargs)
391         self.cache_add_attribute(guid, attribute, data)
392
393     def process_group_policy(self, deleted_gpo_list, changed_gpo_list,
394                              trust_dir=None, private_dir=None):
395         if trust_dir is None:
396             trust_dir = self.lp.cache_path('certs')
397         if private_dir is None:
398             private_dir = self.lp.private_path('certs')
399         if not os.path.exists(trust_dir):
400             os.mkdir(trust_dir, mode=0o755)
401         if not os.path.exists(private_dir):
402             os.mkdir(private_dir, mode=0o700)
403
404         for guid, settings in deleted_gpo_list:
405             if str(self) in settings:
406                 for ca_cn_enc, data in settings[str(self)].items():
407                     self.unapply(guid, ca_cn_enc, data)
408
409         for gpo in changed_gpo_list:
410             if gpo.file_sys_path:
411                 section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
412                 pol_file = 'MACHINE/Registry.pol'
413                 path = os.path.join(gpo.file_sys_path, pol_file)
414                 pol_conf = self.parse(path)
415                 if not pol_conf:
416                     continue
417                 for e in pol_conf.entries:
418                     if e.keyname == section and e.valuename == 'AEPolicy':
419                         # This policy applies as specified in [MS-CAESO] 4.4.5.1
420                         if e.data & 0x8000:
421                             continue # The policy is disabled
422                         enroll = e.data & 0x1 == 0x1
423                         manage = e.data & 0x2 == 0x2
424                         retrive_pending = e.data & 0x4 == 0x4
425                         if enroll:
426                             ca_names = self.__enroll(gpo.name,
427                                                      pol_conf.entries,
428                                                      trust_dir, private_dir)
429
430                             # Cleanup any old CAs that have been removed
431                             ca_attrs = [base64.b64encode(n.encode()).decode()
432                                     for n in ca_names]
433                             self.clean(gpo.name, keep=ca_attrs)
434                         else:
435                             # If enrollment has been disabled for this GPO,
436                             # remove any existing policy
437                             ca_attrs = \
438                                 self.cache_get_all_attribute_values(gpo.name)
439                             self.clean(gpo.name, remove=list(ca_attrs.keys()))
440
441     def __read_cep_data(self, guid, ldb, end_point_information,
442                         trust_dir, private_dir):
443         """Read CEP Data.
444
445         [MS-CAESO] 4.4.5.3.2.4
446         In this step autoenrollment initializes instances of the
447         CertificateEnrollmentPolicy by accessing end points associated with CEP
448         groups created in the previous step.
449         """
450         # For each group created in the previous step:
451         for end_point_group in end_point_information:
452             # Pick an arbitrary instance of the
453             # CertificateEnrollmentPolicyEndPoint from the group
454             e = end_point_group[0]
455
456             # If this instance does not have the AutoEnrollmentEnabled flag set
457             # in the EndPoint.Flags, continue with the next group.
458             if not e['Flags'] & 0x10:
459                 continue
460
461             # If the current group contains a
462             # CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI
463             # equal to "LDAP":
464             if any([e['URL'] == 'LDAP:' for e in end_point_group]):
465                 # Perform an LDAP search to read the value of the objectGuid
466                 # attribute of the root object of the forest root domain NC. If
467                 # any errors are encountered, continue with the next group.
468                 res = ldb.search('', SCOPE_BASE, '(objectClass=*)',
469                                  ['rootDomainNamingContext'])
470                 if len(res) != 1:
471                     continue
472                 res2 = ldb.search(res[0]['rootDomainNamingContext'][0],
473                                   SCOPE_BASE, '(objectClass=*)',
474                                   ['objectGUID'])
475                 if len(res2) != 1:
476                     continue
477
478                 # Compare the value read in the previous step to the
479                 # EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint
480                 # instance. If the values do not match, continue with the next
481                 # group.
482                 objectGUID = '{%s}' % \
483                     octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper()
484                 if objectGUID != e['PolicyID']:
485                     continue
486
487             # For each CertificateEnrollmentPolicyEndPoint instance for that
488             # group:
489             ca_names = []
490             for ca in end_point_group:
491                 # If EndPoint.URI equals "LDAP":
492                 if ca['URL'] == 'LDAP:':
493                     # This is a basic configuration.
494                     cas = fetch_certification_authorities(ldb)
495                     for _ca in cas:
496                         self.apply(guid, _ca, cert_enroll, _ca, ldb, trust_dir,
497                                    private_dir)
498                         ca_names.append(_ca['name'])
499                 # If EndPoint.URI starts with "HTTPS//":
500                 elif ca['URL'].lower().startswith('https://'):
501                     self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
502                                private_dir, auth=ca['auth'])
503                     ca_names.append(ca['name'])
504                 else:
505                     edata = { 'endpoint': ca['URL'] }
506                     log.error('Unrecognized endpoint', edata)
507             return ca_names
508
509     def __enroll(self, guid, entries, trust_dir, private_dir):
510         url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
511         ldb = Ldb(url=url, session_info=system_session(),
512                   lp=self.lp, credentials=self.creds)
513
514         ca_names = []
515         end_point_information = obtain_end_point_information(entries)
516         if len(end_point_information) > 0:
517             ca_names.extend(self.__read_cep_data(guid, ldb,
518                                                  end_point_information,
519                                                  trust_dir, private_dir))
520         else:
521             cas = fetch_certification_authorities(ldb)
522             for ca in cas:
523                 self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
524                            private_dir)
525                 ca_names.append(ca['name'])
526         return ca_names
527
528     def rsop(self, gpo):
529         output = {}
530         pol_file = 'MACHINE/Registry.pol'
531         section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
532         if gpo.file_sys_path:
533             path = os.path.join(gpo.file_sys_path, pol_file)
534             pol_conf = self.parse(path)
535             if not pol_conf:
536                 return output
537             for e in pol_conf.entries:
538                 if e.keyname == section and e.valuename == 'AEPolicy':
539                     enroll = e.data & 0x1 == 0x1
540                     if e.data & 0x8000 or not enroll:
541                         continue
542                     output['Auto Enrollment Policy'] = {}
543                     url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
544                     ldb = Ldb(url=url, session_info=system_session(),
545                               lp=self.lp, credentials=self.creds)
546                     end_point_information = \
547                         obtain_end_point_information(pol_conf.entries)
548                     cas = fetch_certification_authorities(ldb)
549                     if len(end_point_information) > 0:
550                         cas2 = [ep for sl in end_point_information for ep in sl]
551                         if any([ca['URL'] == 'LDAP:' for ca in cas2]):
552                             cas.extend(cas2)
553                         else:
554                             cas = cas2
555                     for ca in cas:
556                         if 'URL' in ca and ca['URL'] == 'LDAP:':
557                             continue
558                         policy = 'Auto Enrollment Policy'
559                         cn = ca['name']
560                         if policy not in output:
561                             output[policy] = {}
562                         output[policy][cn] = {}
563                         if 'cACertificate' in ca:
564                             output[policy][cn]['CA Certificate'] = \
565                                 format_root_cert(ca['cACertificate']).decode()
566                         output[policy][cn]['Auto Enrollment Server'] = \
567                             ca['hostname']
568                         supported_templates = \
569                             get_supported_templates(ca['hostname'])
570                         output[policy][cn]['Templates'] = \
571                             [t.decode() for t in supported_templates]
572         return output