self.smb_conn = libsmb_samba_internal.Conn(self.server, "sysvol",
lp, creds)
- self.conn.mkdir(test_dir)
+ self.smb_conn.mkdir(test_dir)
def tearDown(self):
super(SMBTests, self).tearDown()
cur_dir = test_dir
for subdir in ["subdir-X", "subdir-Y", "subdir-Z"]:
path = self.make_sysvol_path(cur_dir, subdir)
- self.conn.mkdir(path)
+ self.smb_conn.mkdir(path)
dirpaths.append(path)
cur_dir = path
# create another empty dir just for kicks
path = self.make_sysvol_path(cur_dir, "another")
- self.conn.mkdir(path)
+ self.smb_conn.mkdir(path)
empty_dirs.append(path)
# create some files in these directories
# check correct result after creating and then deleting a new dir
new_dir = self.make_sysvol_path(test_dir, 'test-new')
- self.conn.mkdir(new_dir)
+ self.smb_conn.mkdir(new_dir)
self.assertTrue(self.conn.chkpath(new_dir))
- self.conn.rmdir(new_dir)
+ self.smb_conn.rmdir(new_dir)
self.assertFalse(self.conn.chkpath(new_dir))
def test_save_load_text(self):
Py_RETURN_NONE;
}
+/*
+ * Delete an empty directory
+ */
+static NTSTATUS remove_dir(struct py_cli_state *self, const char *dirname)
+{
+ NTSTATUS status;
+
+ if (self->is_smb1) {
+ struct tevent_req *req = NULL;
+
+ req = cli_rmdir_send(NULL, self->ev, self->cli, dirname);
+ if (!py_tevent_req_wait_exc(self, req)) {
+ return NT_STATUS_INTERNAL_ERROR;
+ }
+ status = cli_rmdir_recv(req);
+ TALLOC_FREE(req);
+ } else {
+ status = cli_rmdir(self->cli, dirname);
+ }
+ return status;
+}
+
+static PyObject *py_smb_rmdir(struct py_cli_state *self, PyObject *args)
+{
+ NTSTATUS status;
+ const char *dirname;
+
+ if (!PyArg_ParseTuple(args, "s:rmdir", &dirname)) {
+ return NULL;
+ }
+
+ status = remove_dir(self, dirname);
+ PyErr_NTSTATUS_IS_ERR_RAISE(status);
+
+ Py_RETURN_NONE;
+}
+
+/*
+ * Create a directory
+ */
+static PyObject *py_smb_mkdir(struct py_cli_state *self, PyObject *args)
+{
+ NTSTATUS status;
+ const char *dirname;
+
+ if (!PyArg_ParseTuple(args, "s:mkdir", &dirname)) {
+ return NULL;
+ }
+
+ if (self->is_smb1) {
+ struct tevent_req *req = NULL;
+
+ req = cli_mkdir_send(NULL, self->ev, self->cli, dirname);
+ if (!py_tevent_req_wait_exc(self, req)) {
+ return NULL;
+ }
+ status = cli_mkdir_recv(req);
+ TALLOC_FREE(req);
+ } else {
+ status = cli_mkdir(self->cli, dirname);
+ }
+ PyErr_NTSTATUS_IS_ERR_RAISE(status);
+
+ Py_RETURN_NONE;
+}
+
static PyMethodDef py_cli_state_methods[] = {
{ "settimeout", (PyCFunction)py_cli_settimeout, METH_VARARGS,
"settimeout(new_timeout_msecs) => return old_timeout_msecs" },
{ "unlink", (PyCFunction)py_smb_unlink,
METH_VARARGS,
"unlink(path) -> None\n\n \t\tDelete a file." },
+ { "mkdir", (PyCFunction)py_smb_mkdir, METH_VARARGS,
+ "mkdir(path) -> None\n\n \t\tCreate a directory." },
+ { "rmdir", (PyCFunction)py_smb_rmdir, METH_VARARGS,
+ "rmdir(path) -> None\n\n \t\tDelete a directory." },
{ NULL, NULL, 0, NULL }
};