-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaufs.py
138 lines (111 loc) · 4.3 KB
/
aufs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
# encoding: utf-8
"""
aufs.py is a basic python wrapper around the AUFS filesystem.
For more information on AUFS, see http://aufs.sourceforge.net
"""
# Copyright (c) 2018 Łukasz Kurowski <[email protected]>
# Copyright (c) 2009-2018 Solomon Hykes <[email protected]>
#
# This file is part of aufs.py
# (see http://bitbucket.org/dotcloud/aufs.py).
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import re
import subprocess
from functools import reduce
def cleanpath(path):
return os.path.normpath(os.path.abspath(path))
def exec_sh(cmd, debug=True):
""" Executes shell command """
if debug:
print("# " + " ".join(cmd) + "\n")
return subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0].decode("utf-8")
RE_MTAB = re.compile(r"^(.*) on ([^ ]*) type (.*) \((.*)\)$")
def mtab(typefilter=None):
""" Return a dictionary of all mounted filesystems, keyed by mountpoint """
res = [
(fs["mountpoint"], fs)
for fs in (
dict(
source=groups[0],
mountpoint=os.path.normpath(groups[1]),
type=groups[2],
options=groups[3].split(",")
)
for groups in (
line_match.groups()
for line_match in map(RE_MTAB.match, exec_sh(["mount"]).strip().split("\n"))
if line_match
)
)
if (not typefilter) or (typefilter == fs["type"])
]
return dict(res)
def get_aufs1_branches(mtab_entry):
br_opts = [opt for opt in mtab_entry["options"]
if opt.startswith("br:")]
if not br_opts:
return None
return reduce(list.__add__, [opt.split(":")[1:] for opt in br_opts])
def get_aufs2_branches(mtab_entry):
si_opt = [opt.split("=", 1)[1] for opt in mtab_entry.get(
"options") if opt.startswith("si=")][0]
si_dir = "/sys/fs/aufs/si_%s" % si_opt
return [open(os.path.join(si_dir, f)).read().decode("utf-8").strip()
for f in os.listdir(si_dir) if f.startswith("br")]
class AUFS:
""" An AUFS mountpoint. see http: // aufs.sourceforge.net/
Use the 'layers' property to access or change currently mounted AUFS layers
Examples:
# Mount /etc (read-only) in /tmp/etc, and store any changes in /tmp/etc_changes
>> > AUFS('/tmp/etc').layers = [("/tmp/etc_changes", "rw"), ("/etc", "ro")]
# Unmount /tmp/etc
>> > AUFS('/tmp/etc').layers = []
"""
def __init__(self, mountpoint):
self.mountpoint = cleanpath(mountpoint)
def get_layers(self):
mtab_entry = mtab().get(self.mountpoint)
if not mtab_entry:
return []
aufs_branches = get_aufs1_branches(mtab_entry)
if not aufs_branches:
aufs_branches = get_aufs2_branches(mtab_entry)
if not aufs_branches:
raise ValueError(
"Can't retrieve aufs branches for %s" % self.mountpoint)
return [
(cleanpath(path), access)
for (path, access) in [
br.split("=", 1)
for br in aufs_branches
]
]
def set_layers(self, layers):
layers = [(cleanpath(path), access) for (path, access) in layers]
if layers == self.layers:
return
if self.layers:
exec_sh(("umount", self.mountpoint))
if layers:
opt = ":".join(
["br"] + [path + "=" + access for (path, access) in layers])
exec_sh(("mount", "-t", "aufs", "-o", opt, "none", self.mountpoint))
layers = property(get_layers, set_layers)