51b2874b371cc04cdee6106505866c0e1a13c9af
[janger/samba-autobuild-v4-20-test/.git] / python / samba / gp / gp_cert_auto_enroll_ext.py
1 # gp_cert_auto_enroll_ext samba group policy
2 # Copyright (C) David Mulder <dmulder@suse.com> 2021
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 import os
18 import operator
19 import requests
20 from samba.gp.gpclass import gp_pol_ext, gp_applier, GPOSTATE
21 from samba import Ldb
22 from ldb import SCOPE_SUBTREE, SCOPE_BASE
23 from samba.auth import system_session
24 from samba.gp.gpclass import get_dc_hostname
25 import base64
26 from shutil import which
27 from subprocess import Popen, PIPE
28 import re
29 import json
30 from samba.gp.util.logging import log
31 import struct
32 try:
33     from cryptography.hazmat.primitives.serialization.pkcs7 import \
34         load_der_pkcs7_certificates
35 except ModuleNotFoundError:
36     def load_der_pkcs7_certificates(x): return []
37     log.error('python cryptography missing pkcs7 support. '
38               'Certificate chain parsing will fail')
39 from cryptography.hazmat.primitives.serialization import Encoding
40 from cryptography.x509 import load_der_x509_certificate
41 from cryptography.hazmat.backends import default_backend
42 from samba.common import get_string
43
44 cert_wrap = b"""
45 -----BEGIN CERTIFICATE-----
46 %s
47 -----END CERTIFICATE-----"""
48 endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \
49               '_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP'
50
51 global_trust_dirs = ['/etc/pki/trust/anchors',           # SUSE
52                      '/etc/pki/ca-trust/source/anchors', # RHEL/Fedora
53                      '/usr/local/share/ca-certificates'] # Debian/Ubuntu
54
55 def octet_string_to_objectGUID(data):
56     """Convert an octet string to an objectGUID."""
57     return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0],
58                                '%02x' % struct.unpack('<H', data[4:6])[0],
59                                '%02x' % struct.unpack('<H', data[6:8])[0],
60                                '%02x' % struct.unpack('>H', data[8:10])[0],
61                                '%02x%02x' % struct.unpack('>HL', data[10:]))
62
63
64 def group_and_sort_end_point_information(end_point_information):
65     """Group and Sort End Point Information.
66
67     [MS-CAESO] 4.4.5.3.2.3
68     In this step autoenrollment processes the end point information by grouping
69     it by CEP ID and sorting in the order with which it will use the end point
70     to access the CEP information.
71     """
72     # Create groups of the CertificateEnrollmentPolicyEndPoint instances that
73     # have the same value of the EndPoint.PolicyID datum.
74     end_point_groups = {}
75     for e in end_point_information:
76         if e['PolicyID'] not in end_point_groups.keys():
77             end_point_groups[e['PolicyID']] = []
78         end_point_groups[e['PolicyID']].append(e)
79
80     # Sort each group by following these rules:
81     for end_point_group in end_point_groups.values():
82         # Sort the CertificateEnrollmentPolicyEndPoint instances in ascending
83         # order based on the EndPoint.Cost value.
84         end_point_group.sort(key=lambda e: e['Cost'])
85
86         # For instances that have the same EndPoint.Cost:
87         cost_list = [e['Cost'] for e in end_point_group]
88         costs = set(cost_list)
89         for cost in costs:
90             i = cost_list.index(cost)
91             j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1
92             if i == j:
93                 continue
94
95             # Sort those that have EndPoint.Authentication equal to Kerberos
96             # first. Then sort those that have EndPoint.Authentication equal to
97             # Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint
98             # instances follow in an arbitrary order.
99             def sort_auth(e):
100                 # 0x2 - Kerberos
101                 if e['AuthFlags'] == 0x2:
102                     return 0
103                 # 0x1 - Anonymous
104                 elif e['AuthFlags'] == 0x1:
105                     return 1
106                 else:
107                     return 2
108             end_point_group[i:j+1] = sorted(end_point_group[i:j+1],
109                                             key=sort_auth)
110     return list(end_point_groups.values())
111
112 def obtain_end_point_information(entries):
113     """Obtain End Point Information.
114
115     [MS-CAESO] 4.4.5.3.2.2
116     In this step autoenrollment initializes the
117     CertificateEnrollmentPolicyEndPoints table.
118     """
119     end_point_information = {}
120     section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\'
121     for e in entries:
122         if not e.keyname.startswith(section):
123             continue
124         name = e.keyname.replace(section, '')
125         if name not in end_point_information.keys():
126             end_point_information[name] = {}
127         end_point_information[name][e.valuename] = e.data
128     for ca in end_point_information.values():
129         m = re.match(endpoint_re, ca['URL'])
130         if m:
131             name = '%s-CA' % m.group('server').replace('.', '-')
132             ca['name'] = name
133             ca['hostname'] = m.group('server')
134             ca['auth'] = m.group('auth')
135         elif ca['URL'].lower() != 'ldap:':
136             edata = { 'endpoint': ca['URL'] }
137             log.error('Failed to parse the endpoint', edata)
138             return {}
139     end_point_information = \
140         group_and_sort_end_point_information(end_point_information.values())
141     return end_point_information
142
143 def fetch_certification_authorities(ldb):
144     """Initialize CAs.
145
146     [MS-CAESO] 4.4.5.3.1.2
147     """
148     result = []
149     basedn = ldb.get_default_basedn()
150     # Autoenrollment MUST do an LDAP search for the CA information
151     # (pKIEnrollmentService) objects under the following container:
152     dn = 'CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
153     attrs = ['cACertificate', 'cn', 'dNSHostName']
154     expr = '(objectClass=pKIEnrollmentService)'
155     res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
156     if len(res) == 0:
157         return result
158     for es in res:
159         data = { 'name': get_string(es['cn'][0]),
160                  'hostname': get_string(es['dNSHostName'][0]),
161                  'cACertificate': get_string(base64.b64encode(es['cACertificate'][0]))
162                }
163         result.append(data)
164     return result
165
166 def fetch_template_attrs(ldb, name, attrs=None):
167     if attrs is None:
168         attrs = ['msPKI-Minimal-Key-Size']
169     basedn = ldb.get_default_basedn()
170     dn = 'CN=Certificate Templates,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
171     expr = '(cn=%s)' % name
172     res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
173     if len(res) == 1 and 'msPKI-Minimal-Key-Size' in res[0]:
174         return dict(res[0])
175     else:
176         return {'msPKI-Minimal-Key-Size': ['2048']}
177
178 def format_root_cert(cert):
179     return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert.encode(), 0, re.DOTALL)
180
181 def find_cepces_submit():
182     certmonger_dirs = [os.environ.get("PATH"), '/usr/lib/certmonger',
183                        '/usr/libexec/certmonger']
184     return which('cepces-submit', path=':'.join(certmonger_dirs))
185
186 def get_supported_templates(server):
187     cepces_submit = find_cepces_submit()
188     if not cepces_submit:
189         log.error('Failed to find cepces-submit')
190         return []
191
192     env = os.environ
193     env['CERTMONGER_OPERATION'] = 'GET-SUPPORTED-TEMPLATES'
194     p = Popen([cepces_submit, '--server=%s' % server, '--auth=Kerberos'],
195               env=env, stdout=PIPE, stderr=PIPE)
196     out, err = p.communicate()
197     if p.returncode != 0:
198         data = {'Error': err.decode()}
199         log.error('Failed to fetch the list of supported templates.', data)
200     return out.strip().split()
201
202
203 def getca(ca, url, trust_dir):
204     """Fetch Certificate Chain from the CA."""
205     root_cert = os.path.join(trust_dir, '%s.crt' % ca['name'])
206     root_certs = []
207
208     try:
209         r = requests.get(url=url, params={'operation': 'GetCACert',
210                                           'message': 'CAIdentifier'})
211     except requests.exceptions.ConnectionError:
212         log.warn('Failed to establish a new connection')
213         r = None
214     if r is None or r.content == b'' or r.headers['Content-Type'] == 'text/html':
215         log.warn('Failed to fetch the root certificate chain.')
216         log.warn('The Network Device Enrollment Service is either not' +
217                  ' installed or not configured.')
218         if 'cACertificate' in ca:
219             log.warn('Installing the server certificate only.')
220             der_certificate = base64.b64decode(ca['cACertificate'])
221             try:
222                 cert = load_der_x509_certificate(der_certificate)
223             except TypeError:
224                 cert = load_der_x509_certificate(der_certificate,
225                                                  default_backend())
226             cert_data = cert.public_bytes(Encoding.PEM)
227             with open(root_cert, 'wb') as w:
228                 w.write(cert_data)
229             root_certs.append(root_cert)
230         return root_certs
231
232     if r.headers['Content-Type'] == 'application/x-x509-ca-cert':
233         # Older versions of load_der_x509_certificate require a backend param
234         try:
235             cert = load_der_x509_certificate(r.content)
236         except TypeError:
237             cert = load_der_x509_certificate(r.content, default_backend())
238         cert_data = cert.public_bytes(Encoding.PEM)
239         with open(root_cert, 'wb') as w:
240             w.write(cert_data)
241         root_certs.append(root_cert)
242     elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert':
243         certs = load_der_pkcs7_certificates(r.content)
244         for i in range(0, len(certs)):
245             cert = certs[i].public_bytes(Encoding.PEM)
246             filename, extension = root_cert.rsplit('.', 1)
247             dest = '%s.%d.%s' % (filename, i, extension)
248             with open(dest, 'wb') as w:
249                 w.write(cert)
250             root_certs.append(dest)
251     else:
252         log.warn('getca: Wrong (or missing) MIME content type')
253
254     return root_certs
255
256
257 def find_global_trust_dir():
258     """Return the global trust dir using known paths from various Linux distros."""
259     for trust_dir in global_trust_dirs:
260         if os.path.isdir(trust_dir):
261             return trust_dir
262     return global_trust_dirs[0]
263
264 def update_ca_command():
265     """Return the command to update the CA trust store."""
266     return which('update-ca-certificates') or which('update-ca-trust')
267
268 def changed(new_data, old_data):
269     """Return True if any key present in both dicts has changed."""
270     return any((new_data[k] != old_data[k] if k in old_data else False)
271             for k in new_data.keys())
272
273 def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
274     """Install the root certificate chain."""
275     data = dict({'files': [], 'templates': []}, **ca)
276     url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
277
278     log.info("Try to get root or server certificates")
279
280     root_certs = getca(ca, url, trust_dir)
281     data['files'].extend(root_certs)
282     global_trust_dir = find_global_trust_dir()
283     for src in root_certs:
284         # Symlink the certs to global trust dir
285         dst = os.path.join(global_trust_dir, os.path.basename(src))
286         try:
287             os.symlink(src, dst)
288             data['files'].append(dst)
289             log.info("Created symlink: %s -> %s" % (src, dst))
290         except PermissionError:
291             log.warn('Failed to symlink root certificate to the'
292                      ' admin trust anchors')
293         except FileNotFoundError:
294             log.warn('Failed to symlink root certificate to the'
295                      ' admin trust anchors.'
296                      ' The directory was not found', global_trust_dir)
297         except FileExistsError:
298             # If we're simply downloading a renewed cert, the symlink
299             # already exists. Ignore the FileExistsError. Preserve the
300             # existing symlink in the unapply data.
301             data['files'].append(dst)
302
303     update = update_ca_command()
304     log.info("Running %s" % (update))
305     if update is not None:
306         ret = Popen([update]).wait()
307         if ret != 0:
308             log.error('Failed to run %s' % (update))
309
310     # Setup Certificate Auto Enrollment
311     getcert = which('getcert')
312     cepces_submit = find_cepces_submit()
313     if getcert is not None and cepces_submit is not None:
314         p = Popen([getcert, 'add-ca', '-c', ca['name'], '-e',
315                   '%s --server=%s --auth=%s' % (cepces_submit,
316                   ca['hostname'], auth)],
317                   stdout=PIPE, stderr=PIPE)
318         out, err = p.communicate()
319         log.debug(out.decode())
320         if p.returncode != 0:
321             if p.returncode == 2:
322                 log.info('The CA [%s] already exists' % ca['name'])
323             else:
324                 data = {'Error': err.decode(), 'CA': ca['name']}
325                 log.error('Failed to add Certificate Authority', data)
326
327         supported_templates = get_supported_templates(ca['hostname'])
328         for template in supported_templates:
329             attrs = fetch_template_attrs(ldb, template)
330             nickname = '%s.%s' % (ca['name'], template.decode())
331             keyfile = os.path.join(private_dir, '%s.key' % nickname)
332             certfile = os.path.join(trust_dir, '%s.crt' % nickname)
333             p = Popen([getcert, 'request', '-c', ca['name'],
334                        '-T', template.decode(),
335                        '-I', nickname, '-k', keyfile, '-f', certfile,
336                        '-g', attrs['msPKI-Minimal-Key-Size'][0]],
337                        stdout=PIPE, stderr=PIPE)
338             out, err = p.communicate()
339             log.debug(out.decode())
340             if p.returncode != 0:
341                 if p.returncode == 2:
342                     log.info('The template [%s] already exists' % (nickname))
343                 else:
344                     data = {'Error': err.decode(), 'Certificate': nickname}
345                     log.error('Failed to request certificate', data)
346
347             data['files'].extend([keyfile, certfile])
348             data['templates'].append(nickname)
349         if update is not None:
350             ret = Popen([update]).wait()
351             if ret != 0:
352                 log.error('Failed to run %s' % (update))
353     else:
354         log.warn('certmonger and cepces must be installed for ' +
355                  'certificate auto enrollment to work')
356     return json.dumps(data)
357
358 class gp_cert_auto_enroll_ext(gp_pol_ext, gp_applier):
359     def __str__(self):
360         return r'Cryptography\AutoEnrollment'
361
362     def unapply(self, guid, attribute, value):
363         ca_cn = base64.b64decode(attribute)
364         data = json.loads(value)
365         getcert = which('getcert')
366         if getcert is not None:
367             Popen([getcert, 'remove-ca', '-c', ca_cn]).wait()
368             for nickname in data['templates']:
369                 Popen([getcert, 'stop-tracking', '-i', nickname]).wait()
370         for f in data['files']:
371             if os.path.exists(f):
372                 if os.path.exists(f):
373                     os.unlink(f)
374         self.cache_remove_attribute(guid, attribute)
375
376     def apply(self, guid, ca, applier_func, *args, **kwargs):
377         attribute = base64.b64encode(ca['name'].encode()).decode()
378         # If the policy has changed, unapply, then apply new policy
379         old_val = self.cache_get_attribute_value(guid, attribute)
380         old_data = json.loads(old_val) if old_val is not None else {}
381         templates = ['%s.%s' % (ca['name'], t.decode()) for t in get_supported_templates(ca['hostname'])] \
382             if old_val is not None else []
383         new_data = { 'templates': templates, **ca }
384         if changed(new_data, old_data) or self.cache_get_apply_state() == GPOSTATE.ENFORCE:
385             self.unapply(guid, attribute, old_val)
386         # If policy is already applied and unchanged, skip application
387         if old_val is not None and not changed(new_data, old_data) and \
388                 self.cache_get_apply_state() != GPOSTATE.ENFORCE:
389             return
390
391         # Apply the policy and log the changes
392         data = applier_func(*args, **kwargs)
393         self.cache_add_attribute(guid, attribute, data)
394
395     def process_group_policy(self, deleted_gpo_list, changed_gpo_list,
396                              trust_dir=None, private_dir=None):
397         if trust_dir is None:
398             trust_dir = self.lp.cache_path('certs')
399         if private_dir is None:
400             private_dir = self.lp.private_path('certs')
401         if not os.path.exists(trust_dir):
402             os.mkdir(trust_dir, mode=0o755)
403         if not os.path.exists(private_dir):
404             os.mkdir(private_dir, mode=0o700)
405
406         for guid, settings in deleted_gpo_list:
407             if str(self) in settings:
408                 for ca_cn_enc, data in settings[str(self)].items():
409                     self.unapply(guid, ca_cn_enc, data)
410
411         for gpo in changed_gpo_list:
412             if gpo.file_sys_path:
413                 section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
414                 pol_file = 'MACHINE/Registry.pol'
415                 path = os.path.join(gpo.file_sys_path, pol_file)
416                 pol_conf = self.parse(path)
417                 if not pol_conf:
418                     continue
419                 for e in pol_conf.entries:
420                     if e.keyname == section and e.valuename == 'AEPolicy':
421                         # This policy applies as specified in [MS-CAESO] 4.4.5.1
422                         if e.data & 0x8000:
423                             continue # The policy is disabled
424                         enroll = e.data & 0x1 == 0x1
425                         manage = e.data & 0x2 == 0x2
426                         retrive_pending = e.data & 0x4 == 0x4
427                         if enroll:
428                             ca_names = self.__enroll(gpo.name,
429                                                      pol_conf.entries,
430                                                      trust_dir, private_dir)
431
432                             # Cleanup any old CAs that have been removed
433                             ca_attrs = [base64.b64encode(n.encode()).decode()
434                                     for n in ca_names]
435                             self.clean(gpo.name, keep=ca_attrs)
436                         else:
437                             # If enrollment has been disabled for this GPO,
438                             # remove any existing policy
439                             ca_attrs = \
440                                 self.cache_get_all_attribute_values(gpo.name)
441                             self.clean(gpo.name, remove=list(ca_attrs.keys()))
442
443     def __read_cep_data(self, guid, ldb, end_point_information,
444                         trust_dir, private_dir):
445         """Read CEP Data.
446
447         [MS-CAESO] 4.4.5.3.2.4
448         In this step autoenrollment initializes instances of the
449         CertificateEnrollmentPolicy by accessing end points associated with CEP
450         groups created in the previous step.
451         """
452         # For each group created in the previous step:
453         for end_point_group in end_point_information:
454             # Pick an arbitrary instance of the
455             # CertificateEnrollmentPolicyEndPoint from the group
456             e = end_point_group[0]
457
458             # If this instance does not have the AutoEnrollmentEnabled flag set
459             # in the EndPoint.Flags, continue with the next group.
460             if not e['Flags'] & 0x10:
461                 continue
462
463             # If the current group contains a
464             # CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI
465             # equal to "LDAP":
466             if any([e['URL'] == 'LDAP:' for e in end_point_group]):
467                 # Perform an LDAP search to read the value of the objectGuid
468                 # attribute of the root object of the forest root domain NC. If
469                 # any errors are encountered, continue with the next group.
470                 res = ldb.search('', SCOPE_BASE, '(objectClass=*)',
471                                  ['rootDomainNamingContext'])
472                 if len(res) != 1:
473                     continue
474                 res2 = ldb.search(res[0]['rootDomainNamingContext'][0],
475                                   SCOPE_BASE, '(objectClass=*)',
476                                   ['objectGUID'])
477                 if len(res2) != 1:
478                     continue
479
480                 # Compare the value read in the previous step to the
481                 # EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint
482                 # instance. If the values do not match, continue with the next
483                 # group.
484                 objectGUID = '{%s}' % \
485                     octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper()
486                 if objectGUID != e['PolicyID']:
487                     continue
488
489             # For each CertificateEnrollmentPolicyEndPoint instance for that
490             # group:
491             ca_names = []
492             for ca in end_point_group:
493                 # If EndPoint.URI equals "LDAP":
494                 if ca['URL'] == 'LDAP:':
495                     # This is a basic configuration.
496                     cas = fetch_certification_authorities(ldb)
497                     for _ca in cas:
498                         self.apply(guid, _ca, cert_enroll, _ca, ldb, trust_dir,
499                                    private_dir)
500                         ca_names.append(_ca['name'])
501                 # If EndPoint.URI starts with "HTTPS//":
502                 elif ca['URL'].lower().startswith('https://'):
503                     self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
504                                private_dir, auth=ca['auth'])
505                     ca_names.append(ca['name'])
506                 else:
507                     edata = { 'endpoint': ca['URL'] }
508                     log.error('Unrecognized endpoint', edata)
509             return ca_names
510
511     def __enroll(self, guid, entries, trust_dir, private_dir):
512         url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
513         ldb = Ldb(url=url, session_info=system_session(),
514                   lp=self.lp, credentials=self.creds)
515
516         ca_names = []
517         end_point_information = obtain_end_point_information(entries)
518         if len(end_point_information) > 0:
519             ca_names.extend(self.__read_cep_data(guid, ldb,
520                                                  end_point_information,
521                                                  trust_dir, private_dir))
522         else:
523             cas = fetch_certification_authorities(ldb)
524             for ca in cas:
525                 self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
526                            private_dir)
527                 ca_names.append(ca['name'])
528         return ca_names
529
530     def rsop(self, gpo):
531         output = {}
532         pol_file = 'MACHINE/Registry.pol'
533         section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
534         if gpo.file_sys_path:
535             path = os.path.join(gpo.file_sys_path, pol_file)
536             pol_conf = self.parse(path)
537             if not pol_conf:
538                 return output
539             for e in pol_conf.entries:
540                 if e.keyname == section and e.valuename == 'AEPolicy':
541                     enroll = e.data & 0x1 == 0x1
542                     if e.data & 0x8000 or not enroll:
543                         continue
544                     output['Auto Enrollment Policy'] = {}
545                     url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
546                     ldb = Ldb(url=url, session_info=system_session(),
547                               lp=self.lp, credentials=self.creds)
548                     end_point_information = \
549                         obtain_end_point_information(pol_conf.entries)
550                     cas = fetch_certification_authorities(ldb)
551                     if len(end_point_information) > 0:
552                         cas2 = [ep for sl in end_point_information for ep in sl]
553                         if any([ca['URL'] == 'LDAP:' for ca in cas2]):
554                             cas.extend(cas2)
555                         else:
556                             cas = cas2
557                     for ca in cas:
558                         if 'URL' in ca and ca['URL'] == 'LDAP:':
559                             continue
560                         policy = 'Auto Enrollment Policy'
561                         cn = ca['name']
562                         if policy not in output:
563                             output[policy] = {}
564                         output[policy][cn] = {}
565                         if 'cACertificate' in ca:
566                             output[policy][cn]['CA Certificate'] = \
567                                 format_root_cert(ca['cACertificate']).decode()
568                         output[policy][cn]['Auto Enrollment Server'] = \
569                             ca['hostname']
570                         supported_templates = \
571                             get_supported_templates(ca['hostname'])
572                         output[policy][cn]['Templates'] = \
573                             [t.decode() for t in supported_templates]
574         return output