# coding: utf-8
import pandas as pd
import re
import json
import base64
import requests
import os
import subprocess
from datetime import datetime
import traceback
import logging
from urllib3.exceptions import NewConnectionError
import urllib3
# for SSL Cert Verification error
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
VCP_LIB_VERSION = "23.04.0+20230401"
VCP_SCHEMA = "https"
logger = logging.getLogger("vcp.sdk")
# privileged ON/OFF
PrivilegedMode = False
# API Call
#
#
# requests.put('http://127.0.0.1:5000/put',{'foo': 'bar'}) #put
# requests.delete('http://127.0.0.1:5000/delete') #delete
# requests.head('http://127.0.0.1:5000/get') #HEAD
# requests.options('http://127.0.0.1:5000/options') #OPTIONS
def set_privilege_mode(mode):
global PrivilegedMode
if mode is True:
PrivilegedMode = mode
def unset_privilege_mode(privileged=None):
global PrivilegedMode
# PrivilegedMode Off
if privileged is True:
# set_privilege_mode と同じlayerでunset privileged にされた場合のみ
PrivilegedMode = False
def api_call(occtr, add_uri, req_type="GET", params={}):
"""
VC Contoller のREST APIを呼び出す
:param occtr: VCP LIBベースクラスobject
:param add_uri: REST API endpoint
:param req_type: HTTP REQUEST TYPE(GET/POST/PATCH/PUT/DELETE)
:return: [http_status, response_body]
"""
global PrivilegedMode
# API endpoint
api_end_point = occtr.api_end_point
url = api_end_point + add_uri
#
# HTTP request
#
if occtr.verbose >= 1:
logger.debug("[{}] {} privileged({})".format(req_type, url, PrivilegedMode))
# 認証用headerを設定
headers = {
"x-vcc-access-token": occtr.vcc_access_token,
"Content-Type": "application/json",
}
if PrivilegedMode:
headers["x-vcc-privileged"] = "True"
try:
verify = occtr.insecure_request_warning
if req_type == "GET":
res = requests.get(url, verify=verify, headers=headers)
elif req_type == "POST":
payload = params
res = requests.post(
url, verify=verify, headers=headers, data=json.dumps(payload)
)
elif req_type == "PUT":
payload = params
res = requests.put(
url, verify=verify, headers=headers, data=json.dumps(payload)
)
elif req_type == "PATCH":
payload = params
res = requests.patch(
url, verify=verify, headers=headers, data=json.dumps(payload)
)
elif req_type == "DELETE":
payload = params
res = requests.delete(
url, verify=verify, headers=headers, data=json.dumps(payload)
)
except NewConnectionError:
logger.error(traceback.format_exc())
raise Exception("VCP", "server error")
else:
http_status = res.status_code
result = res.text
if occtr.verbose >= 2:
logger.debug("HTTP STATUS %d" % http_status)
if occtr.verbose >= 3:
logger.debug("BODY = %s" % result)
return [http_status, result]
def vcp_to_status_color(df):
"""
VC一覧などDataFrame情報の status 行に、状態に応じて色付け
:param df: DataFrame
"""
status_colors = {
"BOOTING": "#fe9",
"APPLYING": "#fe9",
"ERROR": "#f99",
"RUNNING": "#dfd",
"SUSPENDING": "#ddd",
"SUSPENDED": "#ddd",
"RESUMING": "#ddd",
"STOPPING": "#ddd",
"STOPPED": "#ddd",
"SHUTTING_DOWN": "#add",
"POWER_OFF": "#aaa",
"DELETING": "#777",
"CONTAINER_ERROR": "#f77",
"HOST_ERROR": "#f77",
}
colors = []
for st in df:
if "state" in df.name and st in status_colors:
colors.append("background-color: %s" % status_colors[st])
else:
colors.append("")
return colors
def vcp_df_color(dfs):
df = dfs
if isinstance(dfs, list):
if len(dfs) == 1:
df = df[0]
elif len(dfs) > 1:
df = pd.concat(dfs, sort=False, ignore_index=True)
if "pandas.core.frame.DataFrame" not in str(type(df)):
return df
return df.style.apply(vcp_to_status_color, axis=0)
def vcp_short_id(id):
"""
vcidのshort形式変換(先頭から8バイトを表示)
"""
return id[0:8] + "..."
def vcp_server_message(body):
"""
VC Content APIの返却メッセージを見やすく出力
:param body: response_body(json)
"""
# body がjsonかチェック
try:
json_body = json.loads(body)
except json.decoder.JSONDecodeError:
return body
# key message をチェック
if "message" in json_body:
# unescape
quoted = "'\"\\"
escape = "\\"
s = json_body["message"]
message = re.sub("%s([%s])" % (re.escape(escape), re.escape(quoted)), "\1", s)
return message.replace("\\n", "\n")
else:
return body
def vcp_server_error(http_status, error_string, body):
"""
VC Controller APIエラーを標準出力
:param http_status: http_status
:param error_string: エラー文字列
:param body: response_body
"""
# VCP(occtr) API 起因のエラー表示
logger.exception("{}: http_status({})".format(error_string, http_status))
logger.exception(vcp_server_message(body))
raise Exception("VCP", error_string)
#
# VCP Node Class
#
[ドキュメント]
class VcNode:
"""
VCP Lib のVcNodeリソース
"""
def __init__(self, unit, node_json):
self.type = "" # compute | storage
self.unit = unit
self.vc = unit.vc
self.occtr = self.vc.occtr
self.node_json = node_json
self.setup_property(node_json)
self.ssh_user_name = "root" # default
def setup_property(self, node_json):
"""
VC Controllerから取得したNode情報(json)を各propertyに保存
- node_no ... unit内番号
- node_id ... node識別子
- disk_no ... unit内番号
- disk_id ... node識別子
- used_state ... アタッチ状態
- cdate ... 作成日時
- name ... node名
- error_message ... エラーメッセージ
- state ... 状態
- cloud_instance_address ... クラウド上のIPアドレス
- cloud_instance_id ... クラウド上のnode識別子
- volumes ... 外部disk情報
- boot_date ... クラウドインスタンスを起動した日時
- container_date ... Baseコンテナを起動した日時
- watch_mode ... 監視状態(True: 監視中、False:監視停止中)
"""
# for DEBUG
if self.unit.type != "storage" and "disk_no" in node_json:
# storage type なのに vc.type が not storage
self.unit.type = "storage"
# 各propertyに保存
if self.unit.type == "compute" or self.unit.type is None: # compute
self.type = "compute"
self.no = node_json["node_no"]
self.id = node_json["node_id"]
self.cloud_instance_address = (
node_json["cloud_instance_address"]
if "cloud_instance_address" in node_json
else ""
)
self.cloud_instance_id = (
node_json["cloud_instance_id"]
if "cloud_instance_id" in node_json
else ""
)
self.volumes = node_json["volumes"] if "volumes" in node_json else []
self.used_state = None
self.cdate = None
self.cloud_disk_id = None
self.cloud_disk_size = None
self.boot_date = node_json["boot_date"] if "boot_date" in node_json else ""
self.container_date = (
node_json["container_date"] if "container_date" in node_json else ""
)
self.watch_mode = (
node_json["watch_mode"] if "watch_mode" in node_json else ""
)
else: # storage
self.type = "storage"
self.no = node_json["disk_no"]
self.id = node_json["disk_id"]
self.used_state = (
"in-use" if "in_use" in node_json and node_json["in_use"] else "un-use"
)
self.cdate = node_json["cdate"] if "cdate" in node_json else ""
self.cloud_disk_id = (
node_json["cloud_disk_id"] if "cloud_disk_id" in node_json else ""
)
self.cloud_disk_size = (
node_json["cloud_disk_size"] if "cloud_disk_size" in node_json else ""
)
self.cloud_instance_address = None
self.cloud_instance_id = None
self.volumes = None
self.name = node_json["provider"] if "provider" in node_json else ""
self.name = node_json["name"] if "name" in node_json else ""
self.error_message = (
node_json["error_message"] if "error_message" in node_json else ""
)
self.state = node_json["state"] if "state" in node_json else ""
[ドキュメント]
def config(self):
"""
NODE情報取得
VC Controllerから最新のVC情報を取得して保存
"""
add_uri = "/vcs/" + self.unit.vc.vcid + "/" + self.unit.name + "/" + self.id
results = api_call(self.occtr, add_uri)
http_status = results[0]
response_body = results[1]
if http_status != 200:
self.last_error_http_status = http_status
vcp_server_error(http_status, "config node failed", response_body)
self.node = json.loads(response_body)
self.setup_property(self.node)
return True
[ドキュメント]
def delete(self):
"""
Node削除
"""
add_uri = "/vcs/" + self.vc.vcid + "/" + self.unit.name + "/" + self.id
results = api_call(self.occtr, add_uri, req_type="DELETE")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "delete node failed", response_body)
return True
[ドキュメント]
def suspend(self):
"""
Node Suspend(docker stop)
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "suspend only compute node")
add_uri = (
"/vcs/" + self.vc.vcid + "/" + self.unit.name + "/" + self.id + "/suspend"
)
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "suspend node failed", response_body)
return True
[ドキュメント]
def resume(self):
"""
Node Resume(docker start)
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "resume only compute node")
add_uri = (
"/vcs/" + self.vc.vcid + "/" + self.unit.name + "/" + self.id + "/resume"
)
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "resume node failed", response_body)
return True
[ドキュメント]
def stop(self):
"""
Node Stop(cloud instance terminate)
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "stop only compute node")
add_uri = (
"/vcs/" + self.vc.vcid + "/" + self.unit.name + "/" + self.id + "/stop"
)
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "stop node failed", response_body)
return True
[ドキュメント]
def start(self):
"""
Node Start(cloud instance create)
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "start only compute node")
add_uri = (
"/vcs/" + self.vc.vcid + "/" + self.unit.name + "/" + self.id + "/start"
)
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "start node failed", response_body)
return True
[ドキュメント]
def backup(self):
"""
Node のcontainer image をVC Controller上にバックアップする
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "backup only compute node")
add_uri = (
"/vcs/" + self.vc.vcid + "/" + self.unit.name + "/" + self.id + "/backup"
)
results = api_call(self.occtr, add_uri, req_type="GET")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "backup failed", response_body)
return True
[ドキュメント]
def watch(self):
"""
Node の監視を開始する
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "watch only compute node")
add_uri = (
"/vcs/" + self.vc.vcid + "/" + self.unit.name + "/" + self.id + "/watch"
)
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "watch failed", response_body)
return True
[ドキュメント]
def unwatch(self):
"""
Node の監視を停止する
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "unwatch only compute node")
add_uri = (
"/vcs/" + self.vc.vcid + "/" + self.unit.name + "/" + self.id + "/unwatch"
)
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "unwatch failed", response_body)
return True
[ドキュメント]
def power_off(self):
"""
VcNode の instance をpower off
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "power_off only compute node")
add_uri = (
"/vcs/" + self.vc.vcid + "/" + self.unit.name + "/" + self.id + "/power_off"
)
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "power_off failed", response_body)
return True
[ドキュメント]
def power_on(self):
"""
VcNode の instance をpower on
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "power_on only compute node")
add_uri = (
"/vcs/" + self.vc.vcid + "/" + self.unit.name + "/" + self.id + "/power_on"
)
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "power_on failed", response_body)
return True
def local_exec(self, cmd, env=None, stdin_fd=None):
"""ssh起動用のコマンド実行
:param cmd: コマンドライン文字列
:param env: 環境変数
"""
proc = subprocess.Popen(
args=cmd,
env=env,
stdin=stdin_fd if stdin_fd else subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
raise Exception(
"stdout = {}\nstderr = {}".format(
stdout.decode("utf-8"), stderr.decode("utf-8")
)
)
return (stdout, stderr)
[ドキュメント]
def ssh_exec(
self,
cmd,
cwd=None,
private_key_path=None,
stdin_fd=None,
output_decode=True,
ssh_user_name=None,
):
"""
VcNode のBaseContainer上でssh を実行
:param cmd: BaseContainer上で起動するコマンド文字列
:param cwd: BaseContainer上の実行カレントディレクトリ
:param private_key_path: VcNode起動時に指定した秘密鍵以外を利用する場合に指定
:param stdin_fd: ssh の起動時にstdinとして渡したいファイルディスクリプタ
:param output_decode: ssh の結果データを True:decodeする(default) / False:decodeしない
:param ssh_user_name: ssh 実行時のユーザ名。defaultは、root
:return: 実行結果の stdout, stderr ファイルディスクリプタ
サンプルコード
.. code-block:: python
(out, err) = node.ssh_exec("ls -la")
print(out)
fd = open("sample.txt")
(out, err) = node.ssh_exec("sed 's/abc/def/' > remote_sample.txt", stdin_fd=fd)
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "ssh_exec only compute node")
# VcNode のhost情報
ip_address = self.cloud_instance_address
if ssh_user_name is None:
ssh_user_name = self.ssh_user_name
host = f"{ssh_user_name}@{ip_address}"
exec_items = []
exec_items.append("ssh")
exec_items.append(host)
if private_key_path:
# 秘密鍵指定
exec_items.append("-i")
exec_items.append(private_key_path)
if cwd:
# cwd 指定がある場合は、chdir失敗時に後処理を中止するため `&&` で繋ぐ
exec_items.append(f"cd {cwd} && {cmd}")
else:
exec_items.append(cmd)
(stdout, stderr) = self.local_exec(exec_items, stdin_fd=stdin_fd)
if output_decode:
return (stdout.decode("utf-8"), stderr.decode("utf-8"))
else:
return (stdout, stderr)
[ドキュメント]
def add_publickey(
self, add_publickey_path, private_key_path=None, ssh_user_name=None
):
"""
VcNode のBaseContainer上で$HOME/.ssh/authorized_keys に公開鍵を追加
:param add_publickey_path: 新規追加する公開鍵のpath
:param private_key_path: VcNode起動時に指定した秘密鍵以外を利用する場合に指定
:param ssh_user_name: ssh 実行時のユーザ名。defaultは、root
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "add_publickey only compute node")
try:
stdin_fd = open(add_publickey_path, "r")
except OSError as e:
raise Exception(
"VCP",
"public_key_path[{}] can't open ({})".format(add_publickey_path, e),
)
return self.ssh_exec(
"cat >> ./.ssh/authorized_keys",
private_key_path=private_key_path,
stdin_fd=stdin_fd,
ssh_user_name=ssh_user_name,
)
[ドキュメント]
def scp(self, src, dst, private_key_path=None, ssh_user_name=None):
"""
local fileをVcNode の BaseContainer上にコピー(scp)する
:param src: local fileのpath
:param dst: BaseContainer上のpath
:param private_key_path: VcNode起動時に指定した秘密鍵以外を利用する場合に指定
:param ssh_user_name: ssh 実行時のユーザ名。defaultは、root
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "ssh_scp only compute node")
# VcNode のhost情報
ip_address = self.cloud_instance_address
if ssh_user_name is None:
ssh_user_name = self.ssh_user_name
host = f"{ssh_user_name}@{ip_address}"
exec_items = []
exec_items.append("scp")
if private_key_path:
# 秘密鍵指定
exec_items.append("-i")
exec_items.append(private_key_path)
exec_items.append(src)
exec_items.append(f"{host}:{dst}")
(stdout, stderr) = self.local_exec(exec_items)
return (stdout, stderr)
def _docker_output2df(self, output):
# docker ps, lsの出力をdfにする
# 例
# CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
# 6550ed474964 hello-world "/hello" 11 seconds ago Exited (0) 10 seconds ago ecstatic_cartwright
# 空の場合
if len(output) == 0:
return None
lines = output.split("\n")
titles = re.split(r" +", lines[0])
title_pos = []
ds = {}
start_pos = 0
for title in titles:
# スペースを読み飛ばす
found_pos = lines[0].find(title, start_pos)
title_pos.append(found_pos)
start_pos = found_pos + len(title)
ds[title] = []
for line in lines[1:]:
if line == "":
break
for title, pos in zip(titles, title_pos):
if len(line) <= pos:
value = ""
else:
value = re.split(r" +", line[pos:])[0]
ds[title].append(value)
return pd.DataFrame(ds)
[ドキュメント]
def docker_ps(self, options=[], ssh_user_name="root", private_key_path=None):
"""
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 出力をDataFrameにしたもの
"""
cmd = ["/usr/local/bin/docker", "ps"] + options
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return self._docker_output2df(stdout)
[ドキュメント]
def docker_inspect(self, container, options=[], ssh_user_name="root", private_key_path=None):
"""
:param container: 対象とするコンテナ
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 出力をPython Dictionary形式に変換したもの
"""
# XXX JSON出力を前提としているため-fオプションは無視すべき
cmd = ["/usr/local/bin/docker", "inspect"] + options + [container]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return json.loads(stdout)
[ドキュメント]
def docker_image_ls(self, options=[], ssh_user_name="root", private_key_path=None):
"""
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 出力をDataFrameにしたもの
"""
cmd = ["/usr/local/bin/docker", "image", "ls"] + options
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return self._docker_output2df(stdout)
[ドキュメント]
def docker_image_inspect(
self, image, options=[], ssh_user_name="root", private_key_path=None
):
"""
:param image: 対象とするイメージ
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 出力をPython Dictionary形式に変換したもの
"""
# XXX JSON出力を前提としているため-fオプションは無視すべき
cmd = ["/usr/local/bin/docker", "image", "inspect"] + options + [image]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return json.loads(stdout)
[ドキュメント]
def docker_network_ls(self, options=[], ssh_user_name="root", private_key_path=None):
"""
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 出力をDataFrameにしたもの
"""
cmd = ["/usr/local/bin/docker", "network", "ls"] + options
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return self._docker_output2df(stdout)
[ドキュメント]
def docker_network_inspect(
self, network, options=[], ssh_user_name="root", private_key_path=None
):
"""
:param network: 対象とするネットワーク
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 出力をPython Dictionary形式に変換したもの
"""
# XXX JSON出力を前提としているため-fオプションは無視すべき
cmd = ["/usr/local/bin/docker", "network", "inspect"] + options + [network]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return json.loads(stdout)
[ドキュメント]
def docker_volume_ls(self, options=[], ssh_user_name="root", private_key_path=None):
"""
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 出力をDataFrameにしたもの
"""
cmd = ["/usr/local/bin/docker", "volume", "ls"] + options
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return self._docker_output2df(stdout)
[ドキュメント]
def docker_volume_inspect(
self, volume, options=[], ssh_user_name="root", private_key_path=None
):
"""
:param volume: 対象とするボリューム
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 出力をPython Dictionary形式に変換したもの
"""
# XXX JSON出力を前提としているため-fオプションは無視すべき
cmd = ["/usr/local/bin/docker", "volume", "inspect"] + options + [volume]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return json.loads(stdout)
[ドキュメント]
def docker_info(self, options=[], ssh_user_name="root", private_key_path=None):
"""
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 標準出力の文字列
"""
cmd = ["/usr/local/bin/docker", "info"] + options
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return stdout
[ドキュメント]
def docker_logs(self, container, options=[], ssh_user_name="root", private_key_path=None):
"""
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: ログの文字列
"""
cmd = ["/usr/local/bin/docker", "logs"] + options + [container]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return stdout
[ドキュメント]
def docker_cp(
self, src, dest, options=[], cwd=None, ssh_user_name="root", private_key_path=None
):
"""BaseコンテナとAppコンテナ間のファイルのコピー
:param src: コピー元のファイルパス
:param dest: コピー先のファイルパス
:param options: コマンドラインオプション
:param cwd: 作業ディレクトリ
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 標準出力の文字列
"""
cmd = ["/usr/local/bin/docker", "cp"] + options + [src, dest]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, cwd=cwd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return stdout
[ドキュメント]
def docker_run(self, image, options=[], cmd=[], ssh_user_name="root", private_key_path=None):
"""
:param container: 実行するイメージ
:param options: コマンドラインオプション
:param cmd: コンテナ内で実行するコマンドライン
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 標準出力と標準エラー出力のタプル
"""
_cmd = ["/usr/local/bin/docker", "run"] + options + [image] + cmd
_cmd = " ".join(_cmd)
(stdout, stderr) = self.ssh_exec(
_cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return (stdout, stderr)
[ドキュメント]
def docker_exec(
self, container, options=[], cmd=[], ssh_user_name="root", private_key_path=None
):
"""
:param container: 対象とするコンテナ
:param options: コマンドラインオプション
:param cmd: コンテナ内で実行するコマンドライン
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 標準出力と標準エラー出力のタプル
"""
_cmd = ["/usr/local/bin/docker", "exec"] + options + [container] + cmd
_cmd = " ".join(_cmd)
(stdout, stderr) = self.ssh_exec(
_cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return (stdout, stderr)
[ドキュメント]
def docker_restart(self, container, options=[], ssh_user_name="root", private_key_path=None):
"""Appコンテナのrestart
:param container: 対象とするコンテナ
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
"""
cmd = ["/usr/local/bin/docker", "restart"] + options + [container]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
[ドキュメント]
def docker_stop(self, container, options=[], ssh_user_name="root", private_key_path=None):
"""Appコンテナのstop
:param container: 対象とするコンテナ
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
"""
cmd = ["/usr/local/bin/docker", "stop"] + options + [container]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
[ドキュメント]
def docker_start(self, container, options=[], ssh_user_name="root", private_key_path=None):
"""Appコンテナのstart
:param container: 対象とするコンテナ
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
"""
cmd = ["/usr/local/bin/docker", "start"] + options + [container]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
[ドキュメント]
def docker_rm(self, container, options=[], ssh_user_name="root", private_key_path=None):
"""Appコンテナの削除
:param container: 対象とするコンテナ
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
"""
cmd = ["/usr/local/bin/docker", "rm"] + options + [container]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
[ドキュメント]
def docker_pull(self, image, options=[], ssh_user_name="root", private_key_path=None):
"""
:param image: 対象とするイメージ
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 標準出力の文字列
"""
cmd = ["/usr/local/bin/docker", "pull"] + options + [image]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return stdout
[ドキュメント]
def docker_push(self, image, options=[], ssh_user_name="root", private_key_path=None):
"""
:param image: 対象とするイメージ
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
:return: 標準出力の文字列
"""
cmd = ["/usr/local/bin/docker", "push"] + options + [image]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
return stdout
[ドキュメント]
def docker_image_rm(self, image, options=[], ssh_user_name="root", private_key_path=None):
"""
:param image: 対象とするイメージ
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
"""
cmd = ["/usr/local/bin/docker", "image", "rm"] + options + [image]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
[ドキュメント]
def docker_image_tag(
self, src_image, dest_image, ssh_user_name="root", private_key_path=None
):
"""
:param src_image: 元のイメージ名
:param dest_image: タグ付けした後のイメージ名
:param options: コマンドラインオプション
:param ssh_user_name: SSH接続時のユーザ名(デフォルト"root")
:param private_key_path: SSHの秘密鍵のパス
"""
cmd = ["/usr/local/bin/docker", "image", "tag", src_image, dest_image]
cmd = " ".join(cmd)
(stdout, stderr) = self.ssh_exec(
cmd, ssh_user_name=ssh_user_name, private_key_path=private_key_path
)
def __str__(self):
res = "[{}]\n".format(self.__class__.__name__)
res += "+ type[{type}] no({no}) state[{state}] id[{id}] ".format(
no=self.no, state=self.state, id=self.id, type=self.type
)
if self.type == "compute": # compute
res += "cloud_instance_address[{address}] cloud_instance_id[{instance_id}] ".format(
address=self.cloud_instance_address, instance_id=self.cloud_instance_id
)
res += "boot_date[{boot_date}] container_date[{container_date}] ".format(
boot_date=self.boot_date, container_date=self.container_date
)
res += "watch_mode[{watch_mode}]".format(watch_mode=self.watch_mode)
if len(self.volumes) > 0:
res += "\n"
for volume in self.volumes:
device_name = (
volume["device_name"] if "device_name" in volume else "none"
)
res += (
"\tvolume_id[{volume_id}] device_name[{device_name}]\n".format(
volume_id=volume["volume_id"], device_name=device_name
)
)
else: # storage
res += "cdate[{cdate}] ".format(cdate=self.cdate)
res += "used_state[{used_state}] ".format(used_state=self.used_state)
res += "cloud_disk_id[{cloud_disk_id}] cloud_disk_size[{cloud_disk_size}] ".format(
cloud_disk_id=self.cloud_disk_id, cloud_disk_size=self.cloud_disk_size
)
if len(self.error_message):
res += "*** HAS ERROR:\n---\n{}\n---\n".format(self.error_message)
res += "\n"
return res
def error(self):
"""
Nodeのエラー情報を返却
"""
if len(self.error_message) == 0:
# 上位のunitを確認
return self.unit.error()
else:
return self.error_message
#
# VCP Unit Class
#
class VcUnit:
"""
VCP Lib のUnitリソース
"""
def __init__(self, vc, unit_json):
self.type = "" # compute | storage
self.occtr = vc.occtr
self.vc = vc
self.unit = unit_json
self.setup_property(unit_json)
self.nodes = []
if self.vc.type == "compute" or self.vc.type is None:
for node in self.unit["node"]:
self.nodes.append(VcNode(self, node))
else: # stroage
for disk in self.unit["disk"]:
self.nodes.append(VcNode(self, disk))
def setup_property(self, unit_json):
"""
VC Controllerから取得したUnit情報(json)を各propertyに保存
- name ... unit名
- state ... 状態
- disk ... disk情報
- error_message ... エラー情報
"""
self.type = "compute"
if self.vc.type == "disk": # storage
self.type = "storage"
self.error_message = (
unit_json["error_message"] if "error_message" in unit_json else ""
)
self.name = unit_json["name"] if "name" in unit_json else ""
self.state = unit_json["state"] if "state" in unit_json else ""
def config(self):
"""
Unit情報取得
"""
add_uri = "/vcs/" + self.vc.vcid + "/" + self.name
results = api_call(self.occtr, add_uri)
http_status = results[0]
response_body = results[1]
if http_status != 200:
self.last_error_http_status = http_status
vcp_server_error(http_status, "config unit failed", response_body)
self.unit = json.loads(response_body)
self.setup_property(self.unit)
self.nodes = []
if "node" in self.unit:
for node in self.unit["node"]:
self.nodes.append(VcNode(self, node))
if "disk" in self.unit:
for node in self.unit["disk"]:
self.nodes.append(VcNode(self, node))
return True
def delete(self, force=False):
"""
Unit削除
:param force: True 配下にnodeが存在してもnode毎削除
"""
# 強制オプション
params = {"force": "false"}
if force:
params["force"] = "true"
add_uri = "/vcs/" + self.vc.vcid + "/" + self.name
results = api_call(self.occtr, add_uri, req_type="DELETE", params=params)
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "delete unit failed", response_body)
return True
def suspend(self):
"""
Unit Suspend(docker stop)
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "suspend only compute unit")
add_uri = "/vcs/" + self.vc.vcid + "/" + self.name + "/suspend"
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "suspend unit failed", response_body)
return True
def resume(self):
"""
Unit Resume(docker start)
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "resume only compute unit")
add_uri = "/vcs/" + self.vc.vcid + "/" + self.name + "/resume"
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "resume unit failed", response_body)
return True
def stop(self):
"""
Unit Stop(CloudInstance terminate)
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "stop only compute unit")
add_uri = "/vcs/" + self.vc.vcid + "/" + self.name + "/stop"
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "stop unit failed", response_body)
return True
def start(self):
"""
Unit Stop(CloudInstance create)
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "start only compute unit")
add_uri = "/vcs/" + self.vc.vcid + "/" + self.name + "/start"
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "start unit failed", response_body)
return True
def watch(self):
"""
Unit Watch
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "watch only compute unit")
add_uri = "/vcs/" + self.vc.vcid + "/" + self.name + "/watch"
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "watch unit failed", response_body)
return True
def unwatch(self):
"""
Unit Unwatch
"""
if self.type != "compute":
# operation only compute
raise Exception("VCP", "unwatch only compute unit")
add_uri = "/vcs/" + self.vc.vcid + "/" + self.name + "/unwatch"
results = api_call(self.occtr, add_uri, req_type="PATCH")
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "unwatch unit failed", response_body)
return True
def add_server(self, ip_address_list):
"""
Unitに既存サーバを追加
(既存サーバはssh loginできる&docker daemon起動可能な状態であること)
:param ip_address_list: 既存サーバのIPアドレス配列
"""
if type(ip_address_list) != "array":
raise Exception("VCP", "ip_address_list must array")
add_uri = "/vcs/" + self.vc.vcid + "/" + self.name
params = {"ip_address_list": ip_address_list}
results = api_call(self.occtr, add_uri, req_type="PUT", params=params)
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "add_server unit failed", response_body)
self.config()
return True
def __str__(self):
res = "[{}]\n".format(self.__class__.__name__)
res += "+ type[{type}] name[{name}] state[{unit_state}]\n".format(
type=self.type, name=self.name, unit_state=self.state
)
# CCIをbase64encodeした情報は出力しない
# res += self.unit_info()
# res += "\n"
if len(self.error_message):
res += "*** HAS ERROR:\n---\n{}\n---\n".format(self.error_message)
res += "\n"
return res
def nodes_df(self, cols=[]):
"""
Node一覧をDataFrame形式で出力
:return: Node一覧情報
"""
if len(cols) == 0:
if self.type == "compute":
cols = [
"vcno",
"vcname",
"unit_name",
"unit_state",
"node_no",
"node_id",
"node_state",
"cloud_instance_address",
"cloud_instance_id",
"cloud_instance_name",
"volumes",
]
else:
cols = [
"vcno",
"vcname",
"unit_name",
"unit_state",
"disk_no",
"disk_id",
"node_state",
"cloud_disk_id",
"cloud_disk_size",
]
ds = dict()
for col in cols:
ds[col] = []
for node in self.nodes:
# vc
if "vc_state" in cols:
ds["vc_state"].append(self.vc.state)
if "vcno" in cols:
ds["vcno"].append(self.vc.vcno)
if "vcname" in cols:
ds["vcname"].append(self.vc.name)
if "vcid" in cols:
ds["vcid"].append(vcp_short_id(self.vc.vcid))
# unit
if "unit_name" in cols:
ds["unit_name"].append(self.name)
if "unit_state" in cols:
ds["unit_state"].append(self.state)
# node
if "node_state" in cols:
ds["node_state"].append(node.state)
if node.type == "compute":
if "node_no" in cols:
ds["node_no"].append(node.no)
if "node_id" in cols:
ds["node_id"].append(vcp_short_id(node.id))
if "cloud_instance_address" in cols:
ds["cloud_instance_address"].append(node.cloud_instance_address)
if "cloud_instance_id" in cols:
ds["cloud_instance_id"].append(node.cloud_instance_id)
# VCP-{vccid{8}}-{eb9de5f2{8}}
if "cloud_instance_name" in cols:
cloud_instance_name = "VCP-%s-%s" % (
self.vc.vcc_id[:8],
self.vc.vcid[:8],
)
ds["cloud_instance_name"].append(cloud_instance_name)
if "volumes" in cols:
if len(node.volumes) > 0:
ds["volumes"].append("exists")
else:
ds["volumes"].append("none")
else:
if "disk_no" in cols:
ds["disk_no"].append(node.no)
if "disk_id" in cols:
ds["disk_id"].append(vcp_short_id(node.id))
if "disk_state" in cols:
ds["node_state"].append(node.state)
if "cloud_disk_id" in cols:
ds["cloud_disk_id"].append(node.cloud_disk_id)
if "cloud_disk_size" in cols:
ds["cloud_disk_size"].append(node.cloud_disk_size)
df = pd.DataFrame(ds)
df = df.reset_index(drop=True)
return df[cols]
def unit_info(self):
"""
UnitのCCI文字列を返却
:return: CCI文字列
"""
cci = self.unit["info"]
return base64.b64encode(cci.encode("utf-8")).decode()
def add_node(self, nums, ip_address_list=[], mac_address_list=[]):
"""
Node作成
:param nums: node数
:param ip_address_list: 起動するnodeのIPアドレス指定(配列)
:param mac_address_list: 起動するnodeのMACアドレス指定(配列、オプショナル) VMwareのみサポート
"""
add_uri = "/vcs/" + self.vc.vcid + "/" + self.name
# mac_address_listとip_address_listの長さがともに0より大きい場合、
# それらの長さは同じであるべき(対応付ける)
if (
0 < len(mac_address_list)
and 0 < len(ip_address_list)
and len(mac_address_list) != len(ip_address_list)
):
raise Exception(
"VCP",
"mac_address_list and ip_address_list has element but their length are different",
)
# list指定時は引数のnumsは不定
if 0 < len(ip_address_list):
nums = len(ip_address_list)
elif 0 < len(mac_address_list):
nums = len(mac_address_list)
params = {
"ip_address_list": ip_address_list,
"mac_address_list": mac_address_list,
"num_nodes": nums,
}
results = api_call(self.occtr, add_uri, req_type="PUT", params=params)
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "add node failed", response_body)
self.config()
return True
def error(self):
"""
Unitのエラー情報を返却
"""
if len(self.error_message) == 0:
# 上位のVcを確認
return self.vc.error()
else:
return self.error_message
#
# VC API
#
class Vc:
"""
VCP Lib のVcリソース
"""
def __init__(self, occtr, vc_json):
self.type = "" # compute | storage
self.occtr = occtr
self.vc = vc_json
self.setup_property(vc_json)
self.units = []
if "unit" in vc_json:
# 引き渡されたVC情報にunitがあれば、UNIT情報を生成する
for unit in vc_json["unit"]:
self.units.append(VcUnit(self, unit))
def setup_property(self, vc_json):
"""
VC Controllerから取得したVC情報(json)を各propertyに保存
- vcc_id ... VCコントローラID
- type ... compute|storage
- vcno ... VC no
- vcid ... VC ID
- name ... VC 名
- owner ... 所有者
- state ... 状態
- cdate ... 作成日付
- error_message
"""
# 18.07.0 以前は type が存在しなかった&computeのみ
self.type = (
vc_json["type"] if "type" in vc_json else "compute"
) # compute|storage
if self.type is None:
self.type = "compute"
self.vcc_id = vc_json["vcc_id"] if "vcc_id" in vc_json else ""
self.vcno = vc_json["vcno"] if "vcno" in vc_json else ""
self.vcid = vc_json["vcid"]
self.name = vc_json["name"] if "name" in vc_json else ""
self.owner = vc_json["owner"] if "owner" in vc_json else ""
self.state = vc_json["state"] if "state" in vc_json else ""
self.cdate = vc_json["cdate"] if "cdate" in vc_json else ""
self.error_message = (
vc_json["error_message"] if "error_message" in vc_json else ""
)
def config(self):
"""
VC情報取得
"""
add_uri = "/vcs/" + self.vcid
results = api_call(self.occtr, add_uri)
http_status = results[0]
response_body = results[1]
if http_status != 200 and http_status != 404:
self.last_error_http_status = http_status
vcp_server_error(http_status, "config vc failed", response_body)
if http_status == 200:
vc_json = json.loads(response_body)
self.setup_property(vc_json)
self.units = []
if "unit" in vc_json:
# 引き渡されたVC情報にunitがあれば、UNIT情報を生成する
for unit in vc_json["unit"]:
self.units.append(VcUnit(self, unit))
return True
else:
# 対象のVCは削除されたか不正ID
self.setup_property(
{"vcid": self.vcid, "vc_type": "", "error_message": "not found"}
)
self.units = []
return False
def update(self, cci=""):
"""
指定したcciでVC情報を更新
:param cci: CCI文字列
"""
add_uri = "/vcs/" + self.vcid
params = {}
if len(cci) > 0:
cci_base64 = base64.b64encode(cci.encode("utf-8")).decode()
params = {"cci": cci_base64}
results = api_call(self.occtr, add_uri, req_type="PUT", params=params)
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "update vc failed", response_body)
self.config()
return True
def delete(self, force=False):
"""
VC削除
:param force: TrueならばVC配下にunit & nodeがあっても全部削除
"""
add_uri = "/vcs/" + self.vcid
# 強制オプション
params = {"force": "false"}
if force:
params["force"] = "true"
results = api_call(self.occtr, add_uri, req_type="DELETE", params=params)
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "delete vc failed", response_body)
return True
def change_owner(self, new_owner):
"""
VCの所有者変更
:param new_owner: 新しい所有者名
"""
add_uri = "/vcs/" + self.vcid
# 強制オプション
params = {"owner": new_owner, "operation": "change_owner"}
results = api_call(self.occtr, add_uri, req_type="PATCH", params=params)
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "change owner vc failed", response_body)
return True
def add_unit(self, unit_name, info):
"""
Unit作成(追加)
:param unit_name: 新規ユニット名
:param info: 新規ユニット用CCI文字列
"""
add_uri = "/vcs/" + self.vcid
# CCI Base64 encode
cci_base64 = base64.b64encode(info.encode("utf-8")).decode()
params = {"info": cci_base64, "unit_name": unit_name}
results = api_call(self.occtr, add_uri, req_type="POST", params=params)
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "add unit failed", response_body)
self.config()
return True
def __str__(self):
res = "[{}]\n".format(self.__class__.__name__)
res += "+ type[{type}] name[{name}] owner({owner}) vcno({vcno}) state[{state}] vcid[{vcid}]\n".format(
type=self.type,
name=self.name,
owner=self.owner,
vcno=self.vcno,
state=self.state,
vcid=self.vcid,
)
# 配下のunitのエラー情報を表示
if len(self.units):
for unit in self.units:
if unit.state == "ERROR":
res += "{}".format(unit)
res += "\n"
return res
def units_df(self, ds={}):
"""
Unit一覧をDataFrame形式で出力
:return: Unit一覧情報
"""
if len(ds.keys()) == 0:
ds = {
"vctype": [],
"vc_state": [],
"vcno": [],
"vcname": [],
"vcid": [],
"cdate": [],
"unit_name": [],
"unit_state": [],
}
for unit in self.units:
ds["vc_state"].append(self.state)
ds["vctype"].append(self.type)
ds["vcno"].append(self.vcno)
ds["vcid"].append(vcp_short_id(self.vcid))
ds["cdate"].append(self.cdate)
ds["vcname"].append(self.name)
ds["unit_name"].append(unit.name)
ds["unit_state"].append(unit.state)
df = pd.DataFrame(ds)
df = df.reset_index(drop=True)
return df[
[
"vctype",
"vcno",
"vcname",
"vc_state",
"vcid",
"cdate",
"unit_name",
"unit_state",
]
]
def error(self):
"""
Vcのエラー情報を返却
"""
return self.error_message
#
# Base
#
class Occtr:
"""
VCP Lib のベースクラス
"""
def __init__(
self,
vcc_access_token="",
vcc_host="localhost",
verbose=0,
insecure_request_warning=True,
):
"""
コンストラクタ
各種ステータスの初期化
- verbose
- last_error_http_status
- VCC Controller API用endpoint
- API認証用アクセストークン
"""
self.verbose = 0
# SDK初期化時にprivileged mode off
unset_privilege_mode(privileged=True)
self.last_error_http_status = 0
# VCC Controller API end point
self.api_end_point = "%s://%s/vcp/v1/occtr" % (VCP_SCHEMA, vcc_host)
self.verbose = verbose
# 認証用 access_key
self.vcc_access_token = vcc_access_token.strip()
# SSL の InsecureRequestWarning 表示
self.insecure_request_warning = insecure_request_warning
# APIからOcctr情報を取得
self.config()
def config(self):
# occ controller から情報を(再)取得
results = api_call(self, "")
http_status = results[0]
response_body = results[1]
if http_status != 200:
self.last_error_http_status = http_status
vcp_server_error(http_status, "config vc failed", response_body)
# occ controller から情報を取得
results = api_call(self, "/vcs")
http_status = results[0]
response_body = results[1]
if http_status != 200:
self.last_error_http_status = http_status
vcp_server_error(http_status, "config vc failed", response_body)
# VCのオブジェクト配列を作成
vc_json = json.loads(response_body)["vcs"]
self.vc_json = vc_json
self.vcs = []
for vc in vc_json:
self.vcs.append(Vc(self, vc))
return True
def vpn_catalog(self, provider=None, vpn_catalog_name="default"):
"""
vcp_catalog情報取得
VC Controllerから最新のvcp_catalog情報を取得して保存
"""
add_uri = "/vpn_catalog"
results = api_call(self, add_uri)
http_status = results[0]
response_body = results[1]
if http_status != 200:
self.last_error_http_status = http_status
vcp_server_error(http_status, "config node failed", response_body)
vpn_catalog = json.loads(response_body)["vpn_catalog"]
if provider is None:
# all vpn catalog, but remove cci_version
if "cci_version" in vpn_catalog:
del vpn_catalog["cci_version"]
return vpn_catalog
if provider in vpn_catalog:
if vpn_catalog_name in vpn_catalog[provider]:
return vpn_catalog[provider][vpn_catalog_name]
raise Exception(
"VCP",
"not exist in vpn_catalog, provider[{}] vpn_catalog_name[{}]".format(
provider, vpn_catalog_name
),
)
def healthcheck(self):
"""
ヘルスチェック
VC Controllerのヘルスチェックを行う
:return: True/False
"""
results = api_call(self, "/")
http_status = results[0]
# response_body = results[1]
if http_status != 200:
self.last_error_http_status = http_status
return False
else:
return True
def authority(self):
"""
VCC API利用者認証・認可
指定されたアクセストークンのアカウントがVC情報取得提供者権限を持つか確認する
:return: True
"""
results = api_call(self, "/accounts/authority")
http_status = results[0]
response_body = results[1]
if http_status != 200:
self.last_error_http_status = http_status
vcp_server_error(http_status, "authority error", response_body)
else:
return response_body
def cci_template_list(self):
"""
`nouse`
コンテナクラスタテンプレート一覧取得
CCT情報の一覧を取得する
"""
results = api_call(self, "/ccts")
http_status = results[0]
response_body = results[1]
if self.verbose >= 2:
logger.debug(response_body)
if http_status != 200:
self.last_error_http_status = http_status
vcp_server_error(http_status, "cci template list error", response_body)
ccts = json.loads(response_body)
return ccts
def cci_candidate(self, name, filename):
"""
`nouse`
コンテナクラスタ情報候補生成(CCT)
:param name: candidate名
:param filename: 出力ファイル名
"""
results = api_call(self, "/accounts/authority")
http_status = results[0]
results = api_call(self, "/ccis/candidate?name=" + name)
http_status = results[0]
response_body = results[1]
if http_status != 200:
self.last_error_http_status = http_status
vcp_server_error(http_status, "cci candidate failed", response_body)
else:
# cci をファイルに書き出す
cci_json = json.loads(response_body)
f = open(filename, "w")
f.write(cci_json["cci"].encode("utf_8"))
f.close()
return True
def publickey(self):
"""
既存サーバ login用のSSH公開鍵取得
"""
results = api_call(self, "/publickey")
http_status = results[0]
response_body = results[1]
if http_status != 200:
self.last_error_http_status = http_status
vcp_server_error(http_status, "publickey failed", response_body)
publickey = json.loads(response_body)["publickey"]
return publickey
def occtr_version(self):
"""
OC Controller APIのバージョン情報取得
"""
results = api_call(self, "/version")
http_status = results[0]
response_body = results[1]
if http_status != 200:
self.last_error_http_status = http_status
vcp_server_error(http_status, "oc controller version failed", response_body)
version_json = json.loads(response_body)
return version_json
def create_vc(self, cci=""):
"""
VC作成
指定されたCCIを用いて新規VCを作成する
:param cci: cci文字列
:return: 新規Vc情報
"""
add_uri = "/vcs"
params = {}
if len(cci) > 0:
cci_base64 = base64.b64encode(cci.encode("utf-8")).decode()
params = {"cci": cci_base64}
results = api_call(self, add_uri, req_type="POST", params=params)
http_status = results[0]
response_body = results[1]
if http_status != 202:
self.last_error_http_status = http_status
vcp_server_error(http_status, "create vc failed", response_body)
new_vc_json = json.loads(response_body)
# VC情報を再読み込みして、作成したVC Objectを探して返す
self.config()
for vc in self.vcs:
if vc.vcid == new_vc_json["vcid"]:
return vc
raise Exception("VCP", "create vc failed: cannot find new vc")
def vcs_df(self):
"""
VC一覧をDataFrame形式で出力
:return: VC一覧情報
"""
ds = {
"vctype": [],
"vcname": [],
"owner": [],
"state": [],
"vcno": [],
"vcid": [],
"cdate": [],
}
for vc in self.vcs:
ds["vctype"].append(vc.type)
ds["vcname"].append(vc.name)
ds["owner"].append(vc.owner)
ds["state"].append(vc.state)
ds["vcno"].append(vc.vcno)
ds["vcid"].append(vcp_short_id(vc.vcid))
ds["cdate"].append(vc.cdate)
df = pd.DataFrame(ds)
df = df.reset_index(drop=True)
df = df[["vcno", "vctype", "owner", "vcname", "state", "vcid", "cdate"]]
return vcp_df_color(df)
def version(self, output=True):
"""
VCP LIBのバージョン情報を出力
"""
if output is True:
print(
"""
vcplib:
filename: {filename}
version: {version}""".format(
filename=__file__, version=VCP_LIB_VERSION
)
)
return {"filename": __file__, "version": VCP_LIB_VERSION}
if __name__ == "__main__":
vcc_host = "192.168.1.1"
vc_access_key = os.environ["VC_ACCESS_KEY"]
oc = Occtr(vc_access_key, vcc_host=vcc_host, verbose=0)
#
# Auth
#
if oc.authority() is True:
logger.debug("authority OK")
#
# HealthCheck
#
if oc.healthcheck() is True:
logger.debug("health check OK")
logger.debug(datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
#
# Version
#
print(oc.occtr_version())