python:gp: Avoid path check for cepces-submit
[samba.git] / python / samba / gp / gp_cert_auto_enroll_ext.py
1 # gp_cert_auto_enroll_ext samba group policy
2 # Copyright (C) David Mulder <dmulder@suse.com> 2021
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 import os
18 import operator
19 import requests
20 from samba.gp.gpclass import gp_pol_ext, gp_applier, GPOSTATE
21 from samba import Ldb
22 from ldb import SCOPE_SUBTREE, SCOPE_BASE
23 from samba.auth import system_session
24 from samba.gp.gpclass import get_dc_hostname
25 import base64
26 from shutil import which
27 from subprocess import Popen, PIPE
28 import re
29 import json
30 from samba.gp.util.logging import log
31 import struct
32 try:
33     from cryptography.hazmat.primitives.serialization.pkcs7 import \
34         load_der_pkcs7_certificates
35 except ModuleNotFoundError:
36     def load_der_pkcs7_certificates(x): return []
37     log.error('python cryptography missing pkcs7 support. '
38               'Certificate chain parsing will fail')
39 from cryptography.hazmat.primitives.serialization import Encoding
40 from cryptography.x509 import load_der_x509_certificate
41 from cryptography.hazmat.backends import default_backend
42 from samba.common import get_string
43
44 cert_wrap = b"""
45 -----BEGIN CERTIFICATE-----
46 %s
47 -----END CERTIFICATE-----"""
48 endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \
49               '_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP'
50
51 global_trust_dirs = ['/etc/pki/trust/anchors',           # SUSE
52                      '/etc/pki/ca-trust/source/anchors', # RHEL/Fedora
53                      '/usr/local/share/ca-certificates'] # Debian/Ubuntu
54
55 def octet_string_to_objectGUID(data):
56     """Convert an octet string to an objectGUID."""
57     return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0],
58                                '%02x' % struct.unpack('<H', data[4:6])[0],
59                                '%02x' % struct.unpack('<H', data[6:8])[0],
60                                '%02x' % struct.unpack('>H', data[8:10])[0],
61                                '%02x%02x' % struct.unpack('>HL', data[10:]))
62
63
64 def group_and_sort_end_point_information(end_point_information):
65     """Group and Sort End Point Information.
66
67     [MS-CAESO] 4.4.5.3.2.3
68     In this step autoenrollment processes the end point information by grouping
69     it by CEP ID and sorting in the order with which it will use the end point
70     to access the CEP information.
71     """
72     # Create groups of the CertificateEnrollmentPolicyEndPoint instances that
73     # have the same value of the EndPoint.PolicyID datum.
74     end_point_groups = {}
75     for e in end_point_information:
76         if e['PolicyID'] not in end_point_groups.keys():
77             end_point_groups[e['PolicyID']] = []
78         end_point_groups[e['PolicyID']].append(e)
79
80     # Sort each group by following these rules:
81     for end_point_group in end_point_groups.values():
82         # Sort the CertificateEnrollmentPolicyEndPoint instances in ascending
83         # order based on the EndPoint.Cost value.
84         end_point_group.sort(key=lambda e: e['Cost'])
85
86         # For instances that have the same EndPoint.Cost:
87         cost_list = [e['Cost'] for e in end_point_group]
88         costs = set(cost_list)
89         for cost in costs:
90             i = cost_list.index(cost)
91             j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1
92             if i == j:
93                 continue
94
95             # Sort those that have EndPoint.Authentication equal to Kerberos
96             # first. Then sort those that have EndPoint.Authentication equal to
97             # Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint
98             # instances follow in an arbitrary order.
99             def sort_auth(e):
100                 # 0x2 - Kerberos
101                 if e['AuthFlags'] == 0x2:
102                     return 0
103                 # 0x1 - Anonymous
104                 elif e['AuthFlags'] == 0x1:
105                     return 1
106                 else:
107                     return 2
108             end_point_group[i:j+1] = sorted(end_point_group[i:j+1],
109                                             key=sort_auth)
110     return list(end_point_groups.values())
111
112 def obtain_end_point_information(entries):
113     """Obtain End Point Information.
114
115     [MS-CAESO] 4.4.5.3.2.2
116     In this step autoenrollment initializes the
117     CertificateEnrollmentPolicyEndPoints table.
118     """
119     end_point_information = {}
120     section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\'
121     for e in entries:
122         if not e.keyname.startswith(section):
123             continue
124         name = e.keyname.replace(section, '')
125         if name not in end_point_information.keys():
126             end_point_information[name] = {}
127         end_point_information[name][e.valuename] = e.data
128     for ca in end_point_information.values():
129         m = re.match(endpoint_re, ca['URL'])
130         if m:
131             name = '%s-CA' % m.group('server').replace('.', '-')
132             ca['name'] = name
133             ca['hostname'] = m.group('server')
134             ca['auth'] = m.group('auth')
135         elif ca['URL'].lower() != 'ldap:':
136             edata = { 'endpoint': ca['URL'] }
137             log.error('Failed to parse the endpoint', edata)
138             return {}
139     end_point_information = \
140         group_and_sort_end_point_information(end_point_information.values())
141     return end_point_information
142
143 def fetch_certification_authorities(ldb):
144     """Initialize CAs.
145
146     [MS-CAESO] 4.4.5.3.1.2
147     """
148     result = []
149     basedn = ldb.get_default_basedn()
150     # Autoenrollment MUST do an LDAP search for the CA information
151     # (pKIEnrollmentService) objects under the following container:
152     dn = 'CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
153     attrs = ['cACertificate', 'cn', 'dNSHostName']
154     expr = '(objectClass=pKIEnrollmentService)'
155     res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
156     if len(res) == 0:
157         return result
158     for es in res:
159         data = { 'name': get_string(es['cn'][0]),
160                  'hostname': get_string(es['dNSHostName'][0]),
161                  'cACertificate': get_string(base64.b64encode(es['cACertificate'][0]))
162                }
163         result.append(data)
164     return result
165
166 def fetch_template_attrs(ldb, name, attrs=None):
167     if attrs is None:
168         attrs = ['msPKI-Minimal-Key-Size']
169     basedn = ldb.get_default_basedn()
170     dn = 'CN=Certificate Templates,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
171     expr = '(cn=%s)' % name
172     res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
173     if len(res) == 1 and 'msPKI-Minimal-Key-Size' in res[0]:
174         return dict(res[0])
175     else:
176         return {'msPKI-Minimal-Key-Size': ['2048']}
177
178 def format_root_cert(cert):
179     return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert.encode(), 0, re.DOTALL)
180
181 def find_cepces_submit():
182     certmonger_dirs = [os.environ.get("PATH"), '/usr/lib/certmonger',
183                        '/usr/libexec/certmonger']
184     return which('cepces-submit', path=':'.join(certmonger_dirs))
185
186 def get_supported_templates(server):
187     cepces_submit = find_cepces_submit()
188     if not cepces_submit:
189         log.error('Failed to find cepces-submit')
190         return []
191
192     env = os.environ
193     env['CERTMONGER_OPERATION'] = 'GET-SUPPORTED-TEMPLATES'
194     p = Popen([cepces_submit, '--server=%s' % server, '--auth=Kerberos'],
195               env=env, stdout=PIPE, stderr=PIPE)
196     out, err = p.communicate()
197     if p.returncode != 0:
198         data = {'Error': err.decode()}
199         log.error('Failed to fetch the list of supported templates.', data)
200     return out.strip().split()
201
202
203 def getca(ca, url, trust_dir):
204     """Fetch Certificate Chain from the CA."""
205     root_cert = os.path.join(trust_dir, '%s.crt' % ca['name'])
206     root_certs = []
207
208     try:
209         r = requests.get(url=url, params={'operation': 'GetCACert',
210                                           'message': 'CAIdentifier'})
211     except requests.exceptions.ConnectionError:
212         log.warn('Failed to establish a new connection')
213         r = None
214     if r is None or r.content == b'' or r.headers['Content-Type'] == 'text/html':
215         log.warn('Failed to fetch the root certificate chain.')
216         log.warn('The Network Device Enrollment Service is either not' +
217                  ' installed or not configured.')
218         if 'cACertificate' in ca:
219             log.warn('Installing the server certificate only.')
220             der_certificate = base64.b64decode(ca['cACertificate'])
221             try:
222                 cert = load_der_x509_certificate(der_certificate)
223             except TypeError:
224                 cert = load_der_x509_certificate(der_certificate,
225                                                  default_backend())
226             cert_data = cert.public_bytes(Encoding.PEM)
227             with open(root_cert, 'wb') as w:
228                 w.write(cert_data)
229             root_certs.append(root_cert)
230         return root_certs
231
232     if r.headers['Content-Type'] == 'application/x-x509-ca-cert':
233         # Older versions of load_der_x509_certificate require a backend param
234         try:
235             cert = load_der_x509_certificate(r.content)
236         except TypeError:
237             cert = load_der_x509_certificate(r.content, default_backend())
238         cert_data = cert.public_bytes(Encoding.PEM)
239         with open(root_cert, 'wb') as w:
240             w.write(cert_data)
241         root_certs.append(root_cert)
242     elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert':
243         certs = load_der_pkcs7_certificates(r.content)
244         for i in range(0, len(certs)):
245             cert = certs[i].public_bytes(Encoding.PEM)
246             filename, extension = root_cert.rsplit('.', 1)
247             dest = '%s.%d.%s' % (filename, i, extension)
248             with open(dest, 'wb') as w:
249                 w.write(cert)
250             root_certs.append(dest)
251     else:
252         log.warn('getca: Wrong (or missing) MIME content type')
253
254     return root_certs
255
256
257 def find_global_trust_dir():
258     """Return the global trust dir using known paths from various Linux distros."""
259     for trust_dir in global_trust_dirs:
260         if os.path.isdir(trust_dir):
261             return trust_dir
262     return global_trust_dirs[0]
263
264 def update_ca_command():
265     """Return the command to update the CA trust store."""
266     return which('update-ca-certificates') or which('update-ca-trust')
267
268 def changed(new_data, old_data):
269     """Return True if any key present in both dicts has changed."""
270     return any((new_data[k] != old_data[k] if k in old_data else False)
271             for k in new_data.keys())
272
273 def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
274     """Install the root certificate chain."""
275     data = dict({'files': [], 'templates': []}, **ca)
276     url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
277     root_certs = getca(ca, url, trust_dir)
278     data['files'].extend(root_certs)
279     global_trust_dir = find_global_trust_dir()
280     for src in root_certs:
281         # Symlink the certs to global trust dir
282         dst = os.path.join(global_trust_dir, os.path.basename(src))
283         try:
284             os.symlink(src, dst)
285             data['files'].append(dst)
286         except PermissionError:
287             log.warn('Failed to symlink root certificate to the'
288                      ' admin trust anchors')
289         except FileNotFoundError:
290             log.warn('Failed to symlink root certificate to the'
291                      ' admin trust anchors.'
292                      ' The directory was not found', global_trust_dir)
293         except FileExistsError:
294             # If we're simply downloading a renewed cert, the symlink
295             # already exists. Ignore the FileExistsError. Preserve the
296             # existing symlink in the unapply data.
297             data['files'].append(dst)
298     update = update_ca_command()
299     if update is not None:
300         Popen([update]).wait()
301     # Setup Certificate Auto Enrollment
302     getcert = which('getcert')
303     cepces_submit = find_cepces_submit()
304     if getcert is not None and cepces_submit is not None:
305         p = Popen([getcert, 'add-ca', '-c', ca['name'], '-e',
306                   '%s --server=%s --auth=%s' % (cepces_submit,
307                   ca['hostname'], auth)],
308                   stdout=PIPE, stderr=PIPE)
309         out, err = p.communicate()
310         log.debug(out.decode())
311         if p.returncode != 0:
312             data = { 'Error': err.decode(), 'CA': ca['name'] }
313             log.error('Failed to add Certificate Authority', data)
314         supported_templates = get_supported_templates(ca['hostname'])
315         for template in supported_templates:
316             attrs = fetch_template_attrs(ldb, template)
317             nickname = '%s.%s' % (ca['name'], template.decode())
318             keyfile = os.path.join(private_dir, '%s.key' % nickname)
319             certfile = os.path.join(trust_dir, '%s.crt' % nickname)
320             p = Popen([getcert, 'request', '-c', ca['name'],
321                        '-T', template.decode(),
322                        '-I', nickname, '-k', keyfile, '-f', certfile,
323                        '-g', attrs['msPKI-Minimal-Key-Size'][0]],
324                        stdout=PIPE, stderr=PIPE)
325             out, err = p.communicate()
326             log.debug(out.decode())
327             if p.returncode != 0:
328                 data = { 'Error': err.decode(), 'Certificate': nickname }
329                 log.error('Failed to request certificate', data)
330             data['files'].extend([keyfile, certfile])
331             data['templates'].append(nickname)
332         if update is not None:
333             Popen([update]).wait()
334     else:
335         log.warn('certmonger and cepces must be installed for ' +
336                  'certificate auto enrollment to work')
337     return json.dumps(data)
338
339 class gp_cert_auto_enroll_ext(gp_pol_ext, gp_applier):
340     def __str__(self):
341         return r'Cryptography\AutoEnrollment'
342
343     def unapply(self, guid, attribute, value):
344         ca_cn = base64.b64decode(attribute)
345         data = json.loads(value)
346         getcert = which('getcert')
347         if getcert is not None:
348             Popen([getcert, 'remove-ca', '-c', ca_cn]).wait()
349             for nickname in data['templates']:
350                 Popen([getcert, 'stop-tracking', '-i', nickname]).wait()
351         for f in data['files']:
352             if os.path.exists(f):
353                 if os.path.exists(f):
354                     os.unlink(f)
355         self.cache_remove_attribute(guid, attribute)
356
357     def apply(self, guid, ca, applier_func, *args, **kwargs):
358         attribute = base64.b64encode(ca['name'].encode()).decode()
359         # If the policy has changed, unapply, then apply new policy
360         old_val = self.cache_get_attribute_value(guid, attribute)
361         old_data = json.loads(old_val) if old_val is not None else {}
362         templates = ['%s.%s' % (ca['name'], t.decode()) for t in get_supported_templates(ca['hostname'])] \
363             if old_val is not None else []
364         new_data = { 'templates': templates, **ca }
365         if changed(new_data, old_data) or self.cache_get_apply_state() == GPOSTATE.ENFORCE:
366             self.unapply(guid, attribute, old_val)
367         # If policy is already applied and unchanged, skip application
368         if old_val is not None and not changed(new_data, old_data) and \
369                 self.cache_get_apply_state() != GPOSTATE.ENFORCE:
370             return
371
372         # Apply the policy and log the changes
373         data = applier_func(*args, **kwargs)
374         self.cache_add_attribute(guid, attribute, data)
375
376     def process_group_policy(self, deleted_gpo_list, changed_gpo_list,
377                              trust_dir=None, private_dir=None):
378         if trust_dir is None:
379             trust_dir = self.lp.cache_path('certs')
380         if private_dir is None:
381             private_dir = self.lp.private_path('certs')
382         if not os.path.exists(trust_dir):
383             os.mkdir(trust_dir, mode=0o755)
384         if not os.path.exists(private_dir):
385             os.mkdir(private_dir, mode=0o700)
386
387         for guid, settings in deleted_gpo_list:
388             if str(self) in settings:
389                 for ca_cn_enc, data in settings[str(self)].items():
390                     self.unapply(guid, ca_cn_enc, data)
391
392         for gpo in changed_gpo_list:
393             if gpo.file_sys_path:
394                 section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
395                 pol_file = 'MACHINE/Registry.pol'
396                 path = os.path.join(gpo.file_sys_path, pol_file)
397                 pol_conf = self.parse(path)
398                 if not pol_conf:
399                     continue
400                 for e in pol_conf.entries:
401                     if e.keyname == section and e.valuename == 'AEPolicy':
402                         # This policy applies as specified in [MS-CAESO] 4.4.5.1
403                         if e.data & 0x8000:
404                             continue # The policy is disabled
405                         enroll = e.data & 0x1 == 0x1
406                         manage = e.data & 0x2 == 0x2
407                         retrive_pending = e.data & 0x4 == 0x4
408                         if enroll:
409                             ca_names = self.__enroll(gpo.name,
410                                                      pol_conf.entries,
411                                                      trust_dir, private_dir)
412
413                             # Cleanup any old CAs that have been removed
414                             ca_attrs = [base64.b64encode(n.encode()).decode()
415                                     for n in ca_names]
416                             self.clean(gpo.name, keep=ca_attrs)
417                         else:
418                             # If enrollment has been disabled for this GPO,
419                             # remove any existing policy
420                             ca_attrs = \
421                                 self.cache_get_all_attribute_values(gpo.name)
422                             self.clean(gpo.name, remove=list(ca_attrs.keys()))
423
424     def __read_cep_data(self, guid, ldb, end_point_information,
425                         trust_dir, private_dir):
426         """Read CEP Data.
427
428         [MS-CAESO] 4.4.5.3.2.4
429         In this step autoenrollment initializes instances of the
430         CertificateEnrollmentPolicy by accessing end points associated with CEP
431         groups created in the previous step.
432         """
433         # For each group created in the previous step:
434         for end_point_group in end_point_information:
435             # Pick an arbitrary instance of the
436             # CertificateEnrollmentPolicyEndPoint from the group
437             e = end_point_group[0]
438
439             # If this instance does not have the AutoEnrollmentEnabled flag set
440             # in the EndPoint.Flags, continue with the next group.
441             if not e['Flags'] & 0x10:
442                 continue
443
444             # If the current group contains a
445             # CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI
446             # equal to "LDAP":
447             if any([e['URL'] == 'LDAP:' for e in end_point_group]):
448                 # Perform an LDAP search to read the value of the objectGuid
449                 # attribute of the root object of the forest root domain NC. If
450                 # any errors are encountered, continue with the next group.
451                 res = ldb.search('', SCOPE_BASE, '(objectClass=*)',
452                                  ['rootDomainNamingContext'])
453                 if len(res) != 1:
454                     continue
455                 res2 = ldb.search(res[0]['rootDomainNamingContext'][0],
456                                   SCOPE_BASE, '(objectClass=*)',
457                                   ['objectGUID'])
458                 if len(res2) != 1:
459                     continue
460
461                 # Compare the value read in the previous step to the
462                 # EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint
463                 # instance. If the values do not match, continue with the next
464                 # group.
465                 objectGUID = '{%s}' % \
466                     octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper()
467                 if objectGUID != e['PolicyID']:
468                     continue
469
470             # For each CertificateEnrollmentPolicyEndPoint instance for that
471             # group:
472             ca_names = []
473             for ca in end_point_group:
474                 # If EndPoint.URI equals "LDAP":
475                 if ca['URL'] == 'LDAP:':
476                     # This is a basic configuration.
477                     cas = fetch_certification_authorities(ldb)
478                     for _ca in cas:
479                         self.apply(guid, _ca, cert_enroll, _ca, ldb, trust_dir,
480                                    private_dir)
481                         ca_names.append(_ca['name'])
482                 # If EndPoint.URI starts with "HTTPS//":
483                 elif ca['URL'].lower().startswith('https://'):
484                     self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
485                                private_dir, auth=ca['auth'])
486                     ca_names.append(ca['name'])
487                 else:
488                     edata = { 'endpoint': ca['URL'] }
489                     log.error('Unrecognized endpoint', edata)
490             return ca_names
491
492     def __enroll(self, guid, entries, trust_dir, private_dir):
493         url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
494         ldb = Ldb(url=url, session_info=system_session(),
495                   lp=self.lp, credentials=self.creds)
496
497         ca_names = []
498         end_point_information = obtain_end_point_information(entries)
499         if len(end_point_information) > 0:
500             ca_names.extend(self.__read_cep_data(guid, ldb,
501                                                  end_point_information,
502                                                  trust_dir, private_dir))
503         else:
504             cas = fetch_certification_authorities(ldb)
505             for ca in cas:
506                 self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
507                            private_dir)
508                 ca_names.append(ca['name'])
509         return ca_names
510
511     def rsop(self, gpo):
512         output = {}
513         pol_file = 'MACHINE/Registry.pol'
514         section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
515         if gpo.file_sys_path:
516             path = os.path.join(gpo.file_sys_path, pol_file)
517             pol_conf = self.parse(path)
518             if not pol_conf:
519                 return output
520             for e in pol_conf.entries:
521                 if e.keyname == section and e.valuename == 'AEPolicy':
522                     enroll = e.data & 0x1 == 0x1
523                     if e.data & 0x8000 or not enroll:
524                         continue
525                     output['Auto Enrollment Policy'] = {}
526                     url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
527                     ldb = Ldb(url=url, session_info=system_session(),
528                               lp=self.lp, credentials=self.creds)
529                     end_point_information = \
530                         obtain_end_point_information(pol_conf.entries)
531                     cas = fetch_certification_authorities(ldb)
532                     if len(end_point_information) > 0:
533                         cas2 = [ep for sl in end_point_information for ep in sl]
534                         if any([ca['URL'] == 'LDAP:' for ca in cas2]):
535                             cas.extend(cas2)
536                         else:
537                             cas = cas2
538                     for ca in cas:
539                         if 'URL' in ca and ca['URL'] == 'LDAP:':
540                             continue
541                         policy = 'Auto Enrollment Policy'
542                         cn = ca['name']
543                         if policy not in output:
544                             output[policy] = {}
545                         output[policy][cn] = {}
546                         if 'cACertificate' in ca:
547                             output[policy][cn]['CA Certificate'] = \
548                                 format_root_cert(ca['cACertificate']).decode()
549                         output[policy][cn]['Auto Enrollment Server'] = \
550                             ca['hostname']
551                         supported_templates = \
552                             get_supported_templates(ca['hostname'])
553                         output[policy][cn]['Templates'] = \
554                             [t.decode() for t in supported_templates]
555         return output