// Copyright (c) 2021. Huawei Technologies Co., Ltd. All rights reserved.
// Package common define common utils
package common
import (
"bufio"
"context"
"errors"
"io"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"huawei.com/mindxdl/base/common/constants"
"huawei.com/npu-exporter/hwlog"
)
var logChanMap map[string]chan bool
// LogQueryResp log query response
type LogQueryResp struct {
LogContent string `json:"logContent"`
RowNumber uint64 `json:"rowNumber"`
}
const (
checkServicePeriod = 5
byteToMB = 20
maxLogSize = 50
// FileMode file mode
FileMode = 0640
// FolderMode folder mode
FolderMode = 0750
runningStatus = "Running"
succeededStatus = "Succeeded"
// LogFolder service log folder
LogFolder = "ServiceLogs"
)
// GetLogFilePath get log file path
func GetLogFilePath(podName string, elem …string) string {
return path.Join(path.Join(elem…), podName+".log")
}
func getStoragePath(b BaseCtx, srcModule string) string {
hwlog.RunLog.Infof("current component is %v", srcModule)
return GetUserPath(b)
}
func getLogParentPath(b BaseCtx, srcModule, namespace string) string {
return path.Join(getStoragePath(b, srcModule), LogFolder, namespace)
}
func getPodLogFilePath(b BaseCtx, srcModule, namespace, podName string) string {
return GetLogFilePath(podName, getLogParentPath(b, srcModule, namespace))
}
func logToFile(podLog io.ReadCloser, logFilePath string, wg *sync.WaitGroup) {
defer wg.Done()
_, err := os.Stat(logFilePath)
if err == nil || os.IsExist(err) {
return
}
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC|os.O_APPEND, FileMode)
if err != nil {
hwlog.RunLog.Errorf("create %s log file failed", path.Base(logFilePath))
return
}
defer logFile.Close()
logWriter := bufio.NewWriter(logFile)
if _, err := io.Copy(logWriter, podLog); err != nil {
hwlog.RunLog.Errorf("write log file failed, path: %v, err: %v", path.Base(logFilePath), err)
}
if err := logWriter.Flush(); err != nil {
hwlog.RunLog.Errorf("flush %s log file failed", path.Base(logFilePath))
}
}
func closePodLogStream(podLogList map[string]io.ReadCloser) {
for podName, podLog := range podLogList {
if err := podLog.Close(); err != nil {
hwlog.RunLog.Errorf("close %s log reader failed", podName)
}
}
}
func deleteOldLogFile(logDir string, oldPodNameList []string) {
for _, podName := range oldPodNameList {
if err := os.Remove(GetLogFilePath(podName, logDir)); err != nil {
hwlog.RunLog.Errorf("remove log file failed, name: %v err: %v, please delete it manually",
podName, err)
}
}
}
func findFirst(element string, list []string) int {
for i, v := range list {
if element == v {
return i
}
}
return -1
}
// getNewPodNameList return newly added pod name list
// Only will deleted pod names remain in oldPodNameList after this function
func getNewPodNameList(clientSet *kubernetes.Clientset, svcNamespace, svcName string,
oldPodNameList []string) ([]string, []string) {
labelSelector := labels.Set(map[string]string{"app": svcName}).AsSelector().String()
podList, err := clientSet.CoreV1().Pods(svcNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return nil, nil
}
var podNameList []string
for _, pod := range podList.Items {
if pod.Status.Phase == runningStatus || pod.Status.Phase == succeededStatus {
if index := findFirst(pod.Name, oldPodNameList); index == -1 {
podNameList = append(podNameList, pod.Name)
} else {
oldPodNameList = append(oldPodNameList[:index], oldPodNameList[index+1:]…)
}
}
}
return podNameList, oldPodNameList
}
func updatePodLogList(clientSet *kubernetes.Clientset, svcNamespace string, podNameList, oldPodNameList []string,
podLogList map[string]io.ReadCloser) {
if podLogList == nil {
podLogList = make(map[string]io.ReadCloser, len(podNameList))
}
opt := &corev1.PodLogOptions{
Follow: true,
}
for _, podName := range podNameList {
logRequest := clientSet.CoreV1().Pods(svcNamespace).GetLogs(podName, opt)
if podLog, err := logRequest.Stream(context.TODO()); err == nil {
podLogList[podName] = podLog
}
}
for _, podName := range oldPodNameList {
if err := podLogList[podName].Close(); err != nil {
hwlog.RunLog.Errorf("close %s log reader failed", podName)
}
delete(podLogList, podName)
}
}
func logRecordGoroutine(svcNameSpace, svcName, logDir string, stopCh <-chan bool) {
var clientSet *kubernetes.Clientset
var cliErr error
podLogList := make(map[string]io.ReadCloser)
var podNameList, oldPodNameList []string
var wg sync.WaitGroup
for {
select {
case <-stopCh:
wg.Wait()
closePodLogStream(podLogList)
deleteOldLogFile(logDir, oldPodNameList)
return
default:
}
if clientSet == nil {
if clientSet, cliErr = K8sClient(""); cliErr != nil {
continue
}
}
podNameList, oldPodNameList = getNewPodNameList(clientSet, svcNameSpace, svcName, oldPodNameList)
updatePodLogList(clientSet, svcNameSpace, podNameList, oldPodNameList, podLogList)
deleteOldLogFile(logDir, oldPodNameList)
oldPodNameList = nil
for podName := range podLogList {
oldPodNameList = append(oldPodNameList, podName)
}
for \_, podName := range podNameList {
wg.Add(1)
if \_, ok := podLogList\[podName\]; ok {
go logToFile(podLogList\[podName\], GetLogFilePath(podName, logDir), &wg)
}
}
time.Sleep(checkServicePeriod \* time.Second)
}
}
// StartLogRecord start log record goroutine
func StartLogRecord(svcNamespace, svcName, srcModule string, b BaseCtx) {
logParentDir := getLogParentPath(b, srcModule, svcNamespace)
if err := os.MkdirAll(logParentDir, FolderMode); err != nil {
hwlog.OpLog.WarnfWithCtx(b.Ctx, "log record failed: create the log folder failed, %s", err.Error())
return
}
if logChanMap == nil {
logChanMap = make(map[string]chan bool)
}
mapKey := srcModule + "-" + svcName
logChanMap[mapKey] = make(chan bool, 1)
hwlog.RunLog.InfofWithCtx(b.Ctx, "start to record log of service %v", svcName)
go logRecordGoroutine(svcNamespace, svcName, logParentDir, logChanMap[mapKey])
}
// StopLogRecord stop log record goroutine
func StopLogRecord(svcName, srcModule string, b BaseCtx) {
mapKey := srcModule + "-" + svcName
if _, ok := logChanMap[mapKey]; ok && logChanMap[mapKey] != nil {
logChanMap[mapKey] <- true
close(logChanMap[mapKey])
delete(logChanMap, mapKey)
}
hwlog.RunLog.InfofWithCtx(b.Ctx, "stop to record log of service %v", svcName)
}
func logQuery(logPath string, offset, limit uint64, b BaseCtx) (*LogQueryResp, error) {
logFile, err := os.OpenFile(logPath, os.O_RDONLY, FileMode)
if err != nil {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "Fail to get log file")
return nil, err
}
defer logFile.Close()
logReader := bufio.NewReader(logFile)
logStrs := make([]string, 0, maxLogSize)
var logLineByte []byte
for line := uint64(0); line < offset+limit; line++ {
lineByte, isPrefix, err := logReader.ReadLine()
if err != nil {
if err != io.EOF {
hwlog.RunLog.ErrorfWithCtx(b.Ctx, "An error occurred while reading log file: %v", err)
return nil, err
}
break
}
logLineByte = append(logLineByte, lineByte…)
if isPrefix {
continue
}
if line >= offset {
logStrs = append(logStrs, string(logLineByte))
}
logLineByte = make([]byte, 0)
}
logQueryResp := LogQueryResp{
LogContent: strings.Join(logStrs, "\n"),
RowNumber: uint64(len(logStrs)),
}
return &logQueryResp, nil
}
func getOffsetAndLimit(logOffset, logLimit string, b BaseCtx) (uint64, uint64, string) {
offset, err := strconv.ParseUint(logOffset, BaseHex, BitSize64)
if err != nil {
hwlog.OpLog.ErrorfWithCtx(b.Ctx, "get service log offset convert to integer failed, err: %v", err)
return 0, 0, ParamConvert2IntegerFailed
}
limit, err := strconv.ParseUint(logLimit, BaseHex, BitSize64)
if err != nil {
hwlog.OpLog.ErrorfWithCtx(b.Ctx, "get service log limit convert to integer failed, err: %v", err)
return 0, 0, ParamConvert2IntegerFailed
}
if err := validLogLimitOffset(offset, limit); err != nil {
hwlog.OpLog.ErrorfWithCtx(b.Ctx, "get service log param invalid: %v", err)
return 0, 0, ParamInvalid
}
return offset, limit, Success
}
func validPodNameNamespace(podName, podNamespace string,
search func(name, svcType string, uid uint64) error, b BaseCtx) string {
if ok := ValidName("podName", podName, b); !ok {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "podName invalid")
return ParamInvalid
}
if ok := ValidName("namespace", podNamespace, b); !ok {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "pod namespace invalid")
return ParamInvalid
}
clientSet, cliErr := K8sClient("")
if cliErr != nil {
hwlog.RunLog.ErrorfWithCtx(b.Ctx, "Failed to get customClient err: %s", cliErr.Error())
return GetK8sClientFailed
}
pod, err := clientSet.CoreV1().Pods(podNamespace).Get(context.Background(),
podName, metav1.GetOptions{})
if err != nil {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "the pod does not exist")
return ParamInvalid
}
if svcType, ok := pod.Labels[constants.TrainManageName]; ok {
name, ok := pod.Labels["dbjob-name"]
if !ok {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "failed to get service name of the pod")
return ParamInvalid
}
if err := search(name, svcType, b.HdInfo.UserID); err != nil {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "the pod does not belong to train manager")
return ParamInvalid
}
} else {
name, ok := pod.Labels["app"]
if !ok {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "failed to get service name of the pod")
return ParamInvalid
}
if err := search(name, "", b.HdInfo.UserID); err != nil {
hwlog.RunLog.ErrorfWithCtx(b.Ctx, "the pod does not belong to task manager, %v", err)
return ParamInvalid
}
}
return ""
}
// QueryServiceLog query service log
func QueryServiceLog(srcModule string,
search func(name, svcType string, uid uint64) error, b BaseCtx, c *gin.Context) {
hwlog.OpLog.InfoWithCtx(b.Ctx, "start to query service log")
offset, limit, errCode := getOffsetAndLimit(c.Query("offset"), c.Query("limit"), b)
if errCode != Success {
ConstructResp(c, errCode, "", nil)
return
}
podName := c.Query("podName")
podNamespace := c.Query("namespace")
if err := validPodNameNamespace(podName, podNamespace, search, b); err != "" {
ConstructResp(c, ParamInvalid, "", nil)
return
}
logPath := getPodLogFilePath(b, srcModule, podNamespace, podName)
log, err := logQuery(logPath, offset, limit, b)
if err != nil {
hwlog.OpLog.ErrorfWithCtx(b.Ctx, "query service pod log failed, err: %v", err)
ConstructResp(c, QueryK8sPodLogFailed, "", nil)
return
}
hwlog.OpLog.InfoWithCtx(b.Ctx, "query pod log succeed")
ConstructResp(c, Success, "", log)
}
func downloadLogFile(writer io.Writer, logPath string, b BaseCtx) (int64, error) {
if fi, err := os.Stat(logPath); err == nil {
if fi.Size()>>byteToMB > maxLogSize {
return 0, errors.New("the log file is too large, please download it by other way")
}
}
logFile, err := os.OpenFile(logPath, os.O_RDONLY, FileMode)
if err != nil {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "Fail to get log file")
return 0, err
}
defer logFile.Close()
return io.Copy(writer, logFile)
}
// DownloadServiceLog download log file
func DownloadServiceLog(srcModule string,
search func(name, svcType string, uid uint64) error, b BaseCtx, c *gin.Context) {
podName := c.Query("podName")
podNamespace := c.Query("namespace")
logFileSize := int64(0)
c.Header(ContentDisposition, "attachment; filename="+podName+".log")
c.Header(ContentType, "application/text/plain")
c.Header(AcceptLength, strconv.FormatInt(logFileSize, BaseHex))
if err := validPodNameNamespace(podName, podNamespace, search, b); err != "" {
ConstructResp(c, ParamInvalid, "", nil)
return
}
hwlog.OpLog.InfofWithCtx(b.Ctx, "get pod log podName(%v), namespace(%v)", podName, podNamespace)
logPath := getPodLogFilePath(b, srcModule, podNamespace, podName)
fileSize, err := downloadLogFile(c.Writer, logPath, b)
if err != nil {
hwlog.OpLog.ErrorfWithCtx(b.Ctx, "download service pod log failed, err: %v", err)
ConstructResp(c, DownloadPodLogFileFailed, "", nil)
return
}
c.Header(AcceptLength, strconv.FormatInt(fileSize, BaseHex))
hwlog.OpLog.InfoWithCtx(b.Ctx, "pod log downloaded")
ConstructResp(c, Success, "", nil)
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章