奇技指南
我們知道runc沒有做到完全隔離/proc、/sys路徑下的文件,所以容器內通過top、free等命令看到的數據都是物理機上的。利用lxcfs可以實現將容器內/proc、/sys文件與物理機隔離,讓top等命令顯示容器內真實數據。本文將來詳細介紹一下。
本文轉載自360雲計算
lxcfs是什麼
我們知道runc沒有做到完全隔離/proc、/sys路徑下的文件,所以容器內通過top、free等命令看到的數據都是物理機上的。對於習慣了虛機,物理機的同學來說不太友好,而且這些命令似乎也失去了本質意義。 lxcfs作用就是將容器內/proc、/sys文件與物理機隔離,讓top等命令顯示容器內真實數據。
說明
lxcfs是以用戶空間文件系統(Filesystem in Userspace)為基礎,以cgroup技術實現的用戶空間的虛擬文件系統。先對fuse和cgroup有大致了解,看本文效果更好些。本文不介紹lxcfs的安裝及使用,網上不乏這樣的好文章。我們主要介紹下lxcfs對cpuonline、loadavg的現實,這兩部分弄懂,其它也大體相同。
容器中讀取lxcfs文件系統
lxcfs程序啟動時會指定一個路徑(如下圖是/var/lib/lxcfs)作為掛載點,以後讀取這個路徑的下文件(cgroup、proc、sys)vfs都會調用內核fuse,fuse回調lxcfs實現的文件操作函數。容器內讀取lxcfs文件系統中的數據時,通過gblic系統調用vfs接口然後轉到fuse內核模塊,內核模塊fuse回調lxcfs程序中實現的回調函數,獲取容器的cgroup,然後去宿主機對應cgroup下讀取並計算後得到容器的實際mem、cpu等信息。lxcfs將物理機的cgroups掛載到運行時環境/run/lxcfs/controllers,但直接在物理機上看不見,因為程序中用unshare做了mounts namespace隔離。lxcfs程序中所有的cgroups信息都從/run/lxcfs/controllers下獲得。
源碼
因為工作中正好需要這兩部分,所以主要介紹下cpuonline和loadavg的實現。nginx、java等程序根據cpu核心數啟動相應個數的進程,cpuonline是相關係統調用的數據來源。沒隔離導致的容器內獲取到cpu核數是物理機的,本應該創建2個進程,實際卻創建40個(容器2c,物理機40c),由於更多的上下文切換導致明顯的性能下降。loadavg目前沒看到有關的分析,這裡也簡單介紹下。
隔離效果
物理機40c128g
1、cpuonline
物理機
容器2c4g
2、loadavg
物理機
容器2c4g
可以看到cpuonline和load average都已經隔離
實現分析
註:cgropu的各個controller文件在main函數執行前打開,保存在fd_hierarchies中,後面使用直接掉openat,不是每次都要open、close文件。通過c語言的attribute((constructor)) 屬性,聲明collect_and_mount_subsystems這個函數
看下collect_and_mount_subsystems
static void __attribute__((constructor)) collect_and_mount_subsystems(void)
{
FILE *f;
char *cret, *line = NULL;
char cwd[MAXPATHLEN];
size_t len = 0;
int i, init_ns = -1;
bool found_unified = false;
if ((f = fopen("/proc/self/cgroup", "r")) == NULL) {
lxcfs_error("Error opening /proc/self/cgroup: %s\\n", strerror(errno));
return;
}
// 讀取宿主機上namespaces controller保存到hierarchies
while (getline(&line, &len, f) != -1) {
......
if (!store_hierarchy(line, p))
goto out;
}
/* Preserve initial namespace. */
init_ns = preserve_mnt_ns(getpid());
if (init_ns < 0) {
lxcfs_error("%s\\n", "Failed to preserve initial mount namespace.");
goto out;
}
fd_hierarchies = malloc(sizeof(int) * num_hierarchies);
if (!fd_hierarchies) {
lxcfs_error("%s\\n", strerror(errno));
goto out;
}
for (i = 0; i < num_hierarchies; i++)
fd_hierarchies[i] = -1;
cret = getcwd(cwd, MAXPATHLEN);
if (!cret)
lxcfs_debug("Could not retrieve current working directory: %s.\\n", strerror(errno));
/* This function calls unshare(CLONE_NEWNS) our initial mount namespace
* to privately mount lxcfs cgroups. */
// 關鍵是這裡,將cgroup下各個控制模塊,掛載到lxcfs進程的自由的mount ns下(/run/lxcfs/container)
if (!cgfs_setup_controllers()) {
lxcfs_error("%s\\n", "Failed to setup private cgroup mounts for lxcfs.");
goto out;
}
......
}
static bool cgfs_setup_controllers(void)
{
// 主要調用unshare 創建私有的mount ns
if (!cgfs_prepare_mounts())
return false;
if (!cgfs_mount_hierarchies()) {
lxcfs_error("%s\\n", "Failed to set up private lxcfs cgroup mounts.");
return false;
}
if (!permute_root())
return false;
return true;
}
static bool cgfs_mount_hierarchies(void)
{
char *target;
size_t clen, len;
int i, ret;
for (i = 0; i < num_hierarchies; i++) {
char *controller = hierarchies[i];
clen = strlen(controller);
len = strlen(BASEDIR) + clen + 2;
target = malloc(len);
if (!target)
return false;
ret = snprintf(target, len, "%s/%s", BASEDIR, controller);
if (ret < 0 || ret >= len) {
free(target);
return false;
}
if (mkdir(target, 0755) < 0 && errno != EEXIST) {
free(target);
return false;
}
if (!strcmp(controller, "unified"))
ret = mount("none", target, "cgroup2", 0, NULL);
else
ret = mount(controller, target, "cgroup", 0, controller);
if (ret < 0) {
lxcfs_error("Failed mounting cgroup %s: %s\\n", controller, strerror(errno));
free(target);
return false;
}
// 將所有cgroup controller 文件打開,保存文件描述符
fd_hierarchies[i] = open(target, O_DIRECTORY);
if (fd_hierarchies[i] < 0) {
free(target);
return false;
}
free(target);
}
return true;
}
lxcfs.c main中主要是解析命令行參數,並調用fuse提供的fuse_main函數將lxcfs相關的文件操作註冊,並傳入掛載點。
......
if (!fuse_main(nargs, newargv, &lxcfs_ops, opts))
......
const struct fuse_operations lxcfs_ops = {
.getattr = lxcfs_getattr,
.readlink = NULL,
.getdir = NULL,
.mknod = NULL,
.mkdir = lxcfs_mkdir,
.unlink = NULL,
.rmdir = lxcfs_rmdir,
.symlink = NULL,
.rename = NULL,
.link = NULL,
.chmod = lxcfs_chmod,
.chown = lxcfs_chown,
.truncate = lxcfs_truncate,
.utime = NULL,
.open = lxcfs_open,
.read = lxcfs_read,
.release = lxcfs_release,
.write = lxcfs_write,
.statfs = NULL,
.flush = lxcfs_flush,
.fsync = lxcfs_fsync,
.setxattr = NULL,
.getxattr = NULL,
.listxattr = NULL,
.removexattr = NULL,
.opendir = lxcfs_opendir,
.readdir = lxcfs_readdir,
.releasedir = lxcfs_releasedir,
.fsyncdir = NULL,
.init = NULL,
.destroy = NULL,
.access = lxcfs_access,
.create = NULL,
.ftruncate = NULL,
.fgetattr = NULL,
};
cpuonline
1、cpuonline信息在/sys/devices/system/cpu/路徑下,lxcfs將對/sys(當然這裡使用任何路徑都可以)的操作註冊到fuse
lxcfs.c:
const struct fuse_operations lxcfs_ops = {
......
.open = lxcfs_open,
.read = lxcfs_read,
.release = lxcfs_release,
.write = lxcfs_write,
......
}
static int lxcfs_read(const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi)
{
int ret;
if (strncmp(path, "/cgroup", 7) == 0) {
up_users();
ret = do_cg_read(path, buf, size, offset, fi);
down_users();
return ret;
}
if (strncmp(path, "/proc", 5) == 0) {
up_users();
ret = do_proc_read(path, buf, size, offset, fi);
down_users();
return ret;
}
if (strncmp(path, "/sys", 4) == 0) {
up_users();
ret = do_sys_read(path, buf, size, offset, fi);
down_users();
return ret;
}
return -EINVAL;
}
static int do_sys_read(const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi)
{
int (*sys_read)(const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi);
char *error;
dlerror(); /* Clear any existing error */
sys_read = (int (*)(const char *, char *, size_t, off_t, struct fuse_file_info *)) dlsym(dlopen_handle, "sys_read");
error = dlerror();
if (error != NULL) {
lxcfs_error("%s\\n", error);
return -1;
}
return sys_read(path, buf, size, offset, fi);
}
文件操作相關代碼(bindings.c、sysfs_fuse.c,cpuset.c)被封裝成liblxcfs.so動態庫,供lxcfs.c調用。上面do_sys_read通過dlsym獲取liblxcfs.so動態庫中的sys_read函數。
2、接著看下讀cpuonline的過程
sysfs_fuse.c:
int sys_read(const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi)
{
struct file_info *f = (struct file_info *)fi->fh;
switch (f->type) {
//cpuonline 模塊,type在open時設置,這裡不做過多介紹,主要看下
//sys_devices_system_cpu_online_read函數的實現
case LXC_TYPE_SYS_DEVICES_SYSTEM_CPU_ONLINE:
return sys_devices_system_cpu_online_read(buf, size, offset, fi);
case LXC_TYPE_SYS_DEVICES:
case LXC_TYPE_SYS_DEVICES_SYSTEM:
case LXC_TYPE_SYS_DEVICES_SYSTEM_CPU:
default:
return -EINVAL;
}
}
static int sys_devices_system_cpu_online_read(char *buf, size_t size,
off_t offset,
struct fuse_file_info *fi)
{
//獲取上下文信息,主要是讀取cuponline進程(例如cat /sys/devices/system/cpu/online的cat進程,以下簡稱「調用進程」)的進程id
struct fuse_context *fc = fuse_get_context();
struct file_info *d = (struct file_info *)fi->fh;
char *cache = d->buf;
char *cg;
char *cpuset = NULL;
bool use_view;
int max_cpus = 0;
pid_t initpid;
ssize_t total_len = 0;
if (offset) {
if (!d->cached)
return 0;
if (offset > d->size)
return -EINVAL;
int left = d->size - offset;
total_len = left > size ? size : left;
memcpy(buf, cache + offset, total_len);
return total_len;
}
//獲取容器中1號進程在物理機上的進程id;initpid返回為0時,說明調用進程是物理上的進程
initpid = lookup_initpid_in_store(fc->pid);
if (initpid <= 0)
initpid = fc->pid;
//獲取容器1號進程的cgroup
//例如:docker/368adedeb87172d68388cee9818e873d73503a5b1d1d2a6b47fbd053f6d68601
cg = get_pid_cgroup(initpid, "cpuset");
if (!cg)
return read_file("/sys/devices/system/cpu/online", buf, size, d);
prune_init_slice(cg);
cpuset = get_cpuset(cg);
if (!cpuset)
goto err;
// 檢查cpu、 cpuacct 控制器是否存在,不存在直接返回物理機cpuonine信息
use_view = use_cpuview(cg);
if (use_view)
// 獲取容器真正可使用的cpu個數,如果容器沒配置cpu quota(默認-1),則直接返回物理信息
max_cpus = max_cpu_count(cg);
if (max_cpus == 0)
return read_file("/sys/devices/system/cpu/online", buf, size, d);
if (max_cpus > 1)
total_len = snprintf(d->buf, d->buflen, "0-%d\\n", max_cpus - 1);
else
total_len = snprintf(d->buf, d->buflen, "0\\n");
if (total_len < 0 || total_len >= d->buflen) {
lxcfs_error("%s\\n", "failed to write to cache");
return 0;
}
d->size = (int)total_len;
d->cached = 1;
if (total_len > size)
total_len = size;
memcpy(buf, d->buf, total_len);
err:
free(cpuset);
free(cg);
return total_len;
}
/*
* Return the maximum number of visible CPUs based on CPU quotas.
* If there is no quota set, zero is returned.
*/
int max_cpu_count(const char *cg)
{
int rv, nprocs;
int64_t cfs_quota, cfs_period;
int nr_cpus_in_cpuset = 0;
char *cpuset = NULL;
// 讀取物理機上容器cpu的quota值
if (!read_cpu_cfs_param(cg, "quota", &cfs_quota))
return 0;
// 讀取物理機上容器cpu的period值
if (!read_cpu_cfs_param(cg, "period", &cfs_period))
return 0;
cpuset = get_cpuset(cg);
if (cpuset)
nr_cpus_in_cpuset = cpu_number_in_cpuset(cpuset);
if (cfs_quota <= 0 || cfs_period <= 0){
if (nr_cpus_in_cpuset > 0)
return nr_cpus_in_cpuset;
return 0;
}
// 容器何用的cpu計算
rv = cfs_quota / cfs_period;
/* In case quota/period does not yield a whole number, add one CPU for
* the remainder.這裡的意思是限制cpu為0.5和,視圖效果為1核。1.5 即 2
*/
if ((cfs_quota % cfs_period) > 0)
rv += 1;
/*獲取可用的cpu核數sysconf(_SC_NPROCESSORS_ONLN)*/
nprocs = get_nprocs();
if (rv > nprocs)
rv = nprocs;
/* use min value in cpu quota and cpuset */
if (nr_cpus_in_cpuset > 0 && nr_cpus_in_cpuset < rv)
rv = nr_cpus_in_cpuset;
return rv;
}
// 看下quota是怎麼獲取的
/*
* Read cgroup CPU quota parameters from `cpu.cfs_quota_us` or `cpu.cfs_period_us`,
* depending on `param`. Parameter value is returned throuh `value`.
*/
static bool read_cpu_cfs_param(const char *cg, const char *param, int64_t *value)
{
bool rv = false;
char file[11 + 6 + 1]; // cpu.cfs__us + quota/period + \\0
char *str = NULL;
sprintf(file, "cpu.cfs_%s_us", param);
// 重點是這裡
if (!cgfs_get_value("cpu", cg, file, &str))
goto err;
......
}
bool cgfs_get_value(const char *controller, const char *cgroup, const char *file, char **value)
{
int ret, fd, cfd;
size_t len;
char *fnam, *tmpc;
// 獲取cpu controller文件描述符,到之前說過fd_hierarchies中查
tmpc = find_mounted_controller(controller, &cfd);
if (!tmpc)
return false;
/* Make sure we pass a relative path to *at() family of functions.
* . + /cgroup + / + file + \\0
*/
len = strlen(cgroup) + strlen(file) + 3;
fnam = alloca(len);
ret = snprintf(fnam, len, "%s%s/%s", *cgroup == '/' ? "." : "", cgroup, file);
if (ret < 0 || (size_t)ret >= len)
return false;
// fd也就是 /run/lxcfs/controllers/cpu/docker/dockerid/cpu.cfs_quota_us
fd = openat(cfd, fnam, O_RDONLY);
if (fd < 0)
return false;
// 讀值cfs_quota_us
*value = slurp_file(fnam, fd);
return *value != NULL;
}
loadavg
- 平均負載的概念:平均負載是一段時間內 活躍task隊列 的平均值,活躍進程指的是TASK_RUNNING, TASK_UNINTERRUPTIBLE狀態的進程。內核計算loadavg的方式,感興趣的同學可以看看源碼。
- loadavg和其他部分不太一樣的是,lxcfs需要用daemon進程計算平均負載,因為我們需要的容器(也就是特定進程的cgroup)的平均負載,宿主機沒有這部分數據。lxcfs用與內核完全相同的方式計算負載,所以loadavg的值還是相當準足準確的。 宿主機計算的平均負載是根據所有的task(進程、線程)計算得到,容器的平均負載是根據容器內的進程計算而得。
1、loadavg daemon分析
load daemon的調用流程:main-> start_loadavg-> load_daemon-> load_begin
load_begin就像注釋寫的一樣,每5s遍歷一次load哈希表,並更新負載值
/*
* Traverse the hash table and update it.
*/
void *load_begin(void *arg)
{
......
while (1) {
if (loadavg_stop == 1)
return NULL;
time1 = clock();
for (i = 0; i < LOAD_SIZE; i++) {
pthread_mutex_lock(&load_hash[i].lock);
if (load_hash[i].next == NULL) {
pthread_mutex_unlock(&load_hash[i].lock);
continue;
}
f = load_hash[i].next;
first_node = 1;
while (f) {
......
// 更新負載
sum = refresh_load(f, path);
if (sum == 0) {
f = del_node(f, i);
} else {
out: f = f->next;
}
free(path);
......
}
if (loadavg_stop == 1)
return NULL;
time2 = clock();
usleep(FLUSH_TIME * 1000000 - (int)((time2 - time1) * 1000000 / CLOCKS_PER_SEC));
}
}
主要分析下refresh_load
/*
* Return 0 means that container p->cg is closed.
* Return -1 means that error occurred in refresh.
* Positive num equals the total number of pid.
*/
static int refresh_load(struct load_node *p, char *path)
{
FILE *f = NULL;
char **idbuf;
char proc_path[256];
int i, ret, run_pid = 0, total_pid = 0, last_pid = 0;
char *line = NULL;
size_t linelen = 0;
int sum, length;
DIR *dp;
struct dirent *file;
do {
idbuf = malloc(sizeof(char *));
} while (!idbuf);
// 這裡從/sys/fs/cgroup/cpu/docker/containerid/cgroup.procs to find the process pid.那容器內進程的pid
sum = calc_pid(&idbuf, path, DEPTH_DIR, 0, p->cfd);
/* normal exit */
if (sum == 0)
goto out;
for (i = 0; i < sum; i++) {
/*clean up '\\n' */
length = strlen(idbuf[i])-1;
idbuf[i][length] = '\\0';
ret = snprintf(proc_path, 256, "/proc/%s/task", idbuf[i]);
if (ret < 0 || ret > 255) {
lxcfs_error("%s\\n", "snprintf() failed in refresh_load.");
i = sum;
sum = -1;
goto err_out;
}
dp = opendir(proc_path);
if (!dp) {
lxcfs_error("%s\\n", "Open proc_path failed in refresh_load.");
continue;
}
// 遍歷/proc//task 目錄(一個進程中創建的每個線程,/proc/ /task 中會創建一個相應的目錄),查找狀態為R或者D的task
while ((file = readdir(dp)) != NULL) {
if (strncmp(file->d_name, ".", 1) == 0)
continue;
if (strncmp(file->d_name, "..", 1) == 0)
continue;
total_pid++;
/* We make the biggest pid become last_pid.*/
ret = atof(file->d_name);
last_pid = (ret > last_pid) ? ret : last_pid;
ret = snprintf(proc_path, 256, "/proc/%s/task/%s/status", idbuf[i], file->d_name);
if (ret < 0 || ret > 255) {
lxcfs_error("%s\\n", "snprintf() failed in refresh_load.");
i = sum;
sum = -1;
closedir(dp);
goto err_out;
}
f = fopen(proc_path, "r");
if (f != NULL) {
while (getline(&line, &linelen, f) != -1) {
/* Find State */
if ((line[0] == 'S') && (line[1] == 't'))
break;
}
if ((line[7] == 'R') || (line[7] == 'D'))
run_pid++;
fclose(f);
}
}
closedir(dp);
}
/*Calculate the loadavg.*/
// 獲取到活躍的task數量後,是時候表演真正的技術了(計算平均負載)。計算公式與內核一致:load(t) = load(t-1) e-5/60 + n (1 - e-5/60)
// 具體含義可以參考:[https://www.helpsystems.com/resources/guideshow-it-works](https://w/unix-load-average-part-1-ww.helpsystems.com/resources/guides/unix-load-average-part-1-how-it-works)
p->avenrun[0] = calc_load(p->avenrun[0], EXP_1, run_pid);
p->avenrun[1] = calc_load(p->avenrun[1], EXP_5, run_pid);
p->avenrun[2] = calc_load(p->avenrun[2], EXP_15, run_pid);
p->run_pid = run_pid;
p->total_pid = total_pid;
p->last_pid = last_pid;
free(line);
err_out:
for (; i > 0; i--)
free(idbuf[i-1]);
out:
free(idbuf);
return sum;
}
2、讀取loadavg
負載計算明白了,讀就簡單了。這裡要注意下的是,load_hash,哈希表中的數據是容器第一次讀/proc/loadavg時插入的(畢竟沒辦法事先知道容器的進程cgroup)。
static int proc_loadavg_read(char *buf, size_t size, off_t offset,
struct fuse_file_info *fi)
{
struct fuse_context *fc = fuse_get_context();
struct file_info *d = (struct file_info *)fi->fh;
pid_t initpid;
char *cg;
size_t total_len = 0;
char *cache = d->buf;
struct load_node *n;
int hash;
int cfd, rv = 0;
unsigned long a, b, c;
if (offset) {
if (offset > d->size)
return -EINVAL;
if (!d->cached)
return 0;
int left = d->size - offset;
total_len = left > size ? size : left;
memcpy(buf, cache + offset, total_len);
return total_len;
}
if (!loadavg)
return read_file("/proc/loadavg", buf, size, d);
initpid = lookup_initpid_in_store(fc->pid);
if (initpid <= 0)
initpid = fc->pid;
cg = get_pid_cgroup(initpid, "cpu");
if (!cg)
return read_file("/proc/loadavg", buf, size, d);
prune_init_slice(cg);
hash = calc_hash(cg) % LOAD_SIZE;
// 根據cgroup在hash表查找node
n = locate_node(cg, hash);
/* First time */
// 第一讀時,先把節點信息插到hash邊
if (n == NULL) {
if (!find_mounted_controller("cpu", &cfd)) {
/*
* In locate_node() above, pthread_rwlock_unlock() isn't used
* because delete is not allowed before read has ended.
*/
pthread_rwlock_unlock(&load_hash[hash].rdlock);
rv = 0;
goto err;
}
do {
n = malloc(sizeof(struct load_node));
} while (!n);
do {
n->cg = malloc(strlen(cg)+1);
} while (!n->cg);
strcpy(n->cg, cg);
n->avenrun[0] = 0;
n->avenrun[1] = 0;
n->avenrun[2] = 0;
n->run_pid = 0;
n->total_pid = 1;
n->last_pid = initpid;
n->cfd = cfd;
insert_node(&n, hash);
}
// 第二次以後開始從daemon的計算結果中讀取
a = n->avenrun[0] + (FIXED_1/200);
b = n->avenrun[1] + (FIXED_1/200);
c = n->avenrun[2] + (FIXED_1/200);
total_len = snprintf(d->buf, d->buflen, "%lu.%02lu %lu.%02lu %lu.%02lu %d/%d %d\\n",
LOAD_INT(a), LOAD_FRAC(a),
LOAD_INT(b), LOAD_FRAC(b),
LOAD_INT(c), LOAD_FRAC(c),
n->run_pid, n->total_pid, n->last_pid);
pthread_rwlock_unlock(&load_hash[hash].rdlock);
if (total_len < 0 || total_len >= d->buflen) {
lxcfs_error("%s\\n", "Failed to write to cache");
rv = 0;
goto err;
}
d->size = (int)total_len;
d->cached = 1;
if (total_len > size)
total_len = size;
memcpy(buf, d->buf, total_len);
rv = total_len;
err:
free(cg);
return rv;
}