混合编程python与C++
阅读原文时间:2023年09月01日阅读:1

上个版本: 只是用到ctypes进行传输, 这次将python服务端更改为C++服务端,方便后续维护.

本文实现功能: python传输图片给C++, C++接受图片后对图片进行处理,并将结果返回给python客户端, pass image from python to C++

.h文件

注意文中的model

// .h
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <signal.h>
#include <opencv2/opencv.hpp>

using namespace std;
using namespace cv;

class ModelManager;

class ServerManager
{
private:
    int m_port;
    char *m_addr;
    cv::VideoCapture m_cap;
    int m_server;
    int m_accept; // client conn
public:
    bool initialization(const int &port, const cv::VideoCapture &cap, char *addr = nullptr);
    bool initialization(const int &port, char *addr = nullptr);
    bool build_connect();
    bool acceptClient();
    void error_print(const char *ptr);
    bool free_connect();

    bool send_data_frame(ModelManager& model);
    bool receive_data_frame(cv::Mat &frame, ModelManager& model);
};

.cpp文件

#include "ServerManager.h"
#include "ModelManager.h"
#define BUFFER_SIZE 65538

void ServerManager::error_print(const char * ptr) {
        perror(ptr);
        exit(EXIT_FAILURE);
}

bool ServerManager::initialization(const int& port, const cv::VideoCapture& cap, char* addr){
    m_port = htons(port);
    m_addr = addr;
    m_cap = cap;
    return true;
}

bool ServerManager::initialization(const int& port, char* addr){
    m_port = htons(port);
    m_addr = addr;
    return true;
}

bool ServerManager::build_connect() {
    struct sockaddr_in server_addr;
    bzero(&server_addr,sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = m_addr?inet_addr(m_addr):INADDR_ANY;
    server_addr.sin_port = m_port;
    // create socket
    m_server = socket(AF_INET, SOCK_STREAM, 0);
    if(m_server < 0)
        error_print("socket bind error");
    // can reuse port
    int on = 1;
    if(setsockopt(m_server,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)) < 0)
        error_print("setsockopt error");
    // bind addr
    if(bind(m_server, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0)
        error_print("bind error");
    // listen only one client
    if(listen(m_server, 1) < 0)
        error_print("listen failed");

    cout << "ServerManager is listening, plesae wait..." << endl;

    return true;
}

bool ServerManager::acceptClient(){
    struct sockaddr_in accept_addr;
    socklen_t accept_len = sizeof(accept_addr);
    bzero(&accept_addr,sizeof(accept_addr));
    // accept client connection
    if((m_accept = accept(m_server,(struct sockaddr*)&accept_addr,&accept_len)) < 0)
        error_print("accept error");
    std::cout << "Connection established" << std::endl;
    return true;
}

bool ServerManager::send_data_frame(ModelManager& model) {
    char *json_output = nullptr;
    json_output = model.createJson();
    if (json_output == nullptr) {
        return false;
    }

    // printf("send data %s\n", json_output);
    // just send json_output, dont memcpy new char*!!! it wastes me two hours
    // send json
    int result = send(m_accept, json_output, strlen(json_output), 0);
    if (result == -1) {
        cout << "send fail" << endl;
        return false;
    }
    return true;
}

bool ServerManager::receive_data_frame(Mat& frame, ModelManager& model) {
    // recv frame size
    int data_size;
    if (recv(m_accept, &data_size, sizeof(data_size), 0) != sizeof(data_size)) {
        // when client close, then close connection
        close(m_accept);
        cout << "close connection to client" << endl;
        acceptClient(); // restart a new accept, to accept new connection
        return false;
    }
    cout << data_size << endl;
    // recv frame data

    // char buf[data_size];
    // std::vector<uchar> decode;
    // int bytes_received = 0;
    // do
    // {
    //  int nBytes = recv(m_accept, buf, data_size - bytes_received, 0);
    //  for (int i = 0; i < nBytes; i++)  // maybe can use memcpy, maybe faster
    //  {
    //      decode.emplace_back(buf[i]);
    //  }
    //  cout << bytes_received << endl;
    //  bytes_received += nBytes;
    // } while (bytes_received < data_size);
    char *recv_char = new char[data_size];
    std::vector<uchar> decode(data_size, 0);
    int index = 0;
    int bytes_received = 0;
    int count = data_size;
    while (count > 0)// if count >= 0, dead loop
    {
        int iRet = recv(m_accept, recv_char, count, 0);
        if (index >= data_size) index = data_size;
        memcpy(&decode[index], recv_char , iRet);
        index += iRet;
        if (!iRet) { return -1; }
        count -= iRet;
    }
    // decode message
    frame = imdecode(decode, cv::IMREAD_COLOR);
    // push into Model's queueMat
    model.mtxQueueImg.lock();
    model.queueMat.push(frame);
    model.mtxQueueImg.unlock();
    return true;
}

bool ServerManager::free_connect() {
    m_cap.release();
    close(m_accept);
    close(m_server);
    return true;
}

.h文件

#pragma once
#include "CV_Classify.h"
#include "CV_Detect.h"
#include "ServerManager.h"
#include <opencv2/opencv.hpp>
#include <mutex>
#include <queue>
#include <unistd.h> // usleep
#include <thread>
#include "cJSON.h"
#include <string>

using namespace std;
using namespace cv;

class ModelManager{
public:
    Detect objdetect;
    Classify objclassify;
    std::mutex mtxQueueDet;  // mutex for detect queue
    std::mutex mtxQueueImg;  // mutex for image queue
    std::mutex mtxQueueCls;  // mutex for classify queue
    std::queue<cv::Mat> queueMat;
    std::queue<ObjDetectOutput> queueDetOut;// Detect queue
    std::queue<ObjClassifyOutput> queueClsOut;// Classify queue

    bool DetectFlag = true;
    bool ClassifyFlag = true;
    bool empty_flag = false;
    friend class ServerManager;
public:
    void initDetectModel() ;
    void initClassifyModel() ;
    void DetectImg();
    void ClassifyImg();
    void getClsResult(ObjClassifyOutput &output);
    // ObjClassifyOutput getClsResult();
    char* createJson();
};

.cpp文件

部分有删减,createJson可参考使用,利用json来传递值

#include "ModelManager.h"

void ModelManager::initDetectModel()
{
    std::string config_path = "DetectConfig.yaml";
    objdetect.Init(config_path, 1);
}
void ModelManager::initClassifyModel()
{
    std::string config_path = "ClassiflyConfig.yaml";
    objclassify.Init(config_path, 1);
}

void ModelManager::DetectImg()
{
    DetectInput detect_input;
    DetectOutput detect_output;
    cv::Mat frame;
    size_t mm = 0;
    while(1)
    {
        if (queueMat.empty())
        {
            if(!DetectFlag)
            {
                break;
            }
            usleep(2000);
            continue;
        }
        // get image from queueMat
        mtxQueueImg.lock();
        frame = queueMat.front();
        queueMat.pop();
        mtxQueueImg.unlock();
        // run model
        objdetect.Run(detect_input, detect_output);
        // push detect result into queueDetOut
        mtxQueueDet.lock();
        queueDetOut.push(detect_output);
        // cout << "detect run !!" << endl;
        mtxQueueDet.unlock();
    }
    return;
}
void ModelManager::ClassifyImg()
{
    ObjClassifyInput input;
    ObjClassifyOutput output;
    cv::Mat frame;
    Detoutput detect_result;
    while(1)
    {
        if (queueDetOut.empty())
        {
            if(!ClassifyFlag)
            {
                break;
            }
            usleep(2000);
            continue;
        }
        // get detect from queueDetOut
        mtxQueueDet.lock();
        detect_result = queueDetOut.front();
        queueDetOut.pop();
        mtxQueueDet.unlock();
        // run model
        objclassify.Run(input, output);
        // push cls result into queueClsOut
        mtxQueueCls.lock();
        queueClsOut.push(output);
        mtxQueueCls.unlock();
    }
    return;
}

void ModelManager::getClsResult(ObjClassifyOutput& output){
    if (queueClsOut.empty()){
        output.object_list.object_num = -1;  // -1 is now empty;
        return; // must return in thread otherwise cant use &output
    }
    output = queueClsOut.front();
    queueClsOut.pop();
    return;
}

char* ModelManager::createJson()  // dont know why cant use &value, need return value
{
    mtxQueueCls.lock();
    ObjClassifyOutput output;
    getClsResult(output);
    mtxQueueCls.unlock();

    if (output.object_list.object_num == -1){
        return nullptr;
    }
    // prepare send data json
    cJSON* json_object_list = NULL;
    cJSON* json_ObjClassifyOutput = NULL;

    json_ObjClassifyOutput = cJSON_CreateObject();
    json_object_list = cJSON_CreateObject();
    cJSON_AddItemToObject(json_ObjClassifyOutput, "object_list", json_object_list);

    int obj_num = output.object_list.object_num;
    cJSON_AddNumberToObject(json_object_list, "object_num", obj_num);
    for (int i = 0; i < obj_num; ++i){
        cJSON* json_object = cJSON_CreateObject();
        cJSON* json_box = cJSON_CreateObject();
        cJSON_AddNumberToObject(json_box,"x", output.object_list.object[i].bbox.x);
        cJSON_AddNumberToObject(json_box,"y", output.object_list.object[i].bbox.y);
        cJSON_AddNumberToObject(json_box,"w", output.object_list.object[i].bbox.w);
        cJSON_AddNumberToObject(json_box,"h", output.object_list.object[i].bbox.h);
        cJSON_AddItemToObject(json_object,"bbox", json_box);
        cJSON_AddNumberToObject(json_object, "classes", output.object_list.object[i].classes);
        cJSON_AddNumberToObject(json_object, "objectness", output.object_list.object[i].objectness);
        // double prob = output.object_list.object[i].prob;
        // cJSON_AddNumberToObject(json_object, "prob", prob); // pointer cant use?
        string str = "object" + to_string(i);
        cJSON_AddItemToObject(json_object_list, str.c_str(), json_object);
        // printf("prob: %f", output.object_list.object[i].prob);
    }

    char* json_output = cJSON_Print(json_ObjClassifyOutput);
    cJSON_Delete(json_ObjClassifyOutput);
    return json_output;
}



#include <../include/ServerManager.h>
#include <../include/ModelManager.h>
#include <thread>
#define PORT 8080
void recvServer(ServerManager& s, ModelManager& model){
    int idx = 0;

    while (true){
        // auto start = std::chrono::steady_clock::now();
        cv::Mat frame;
        s.receive_data_frame(frame, model);
        // cal time cost
        // auto end = std::chrono::steady_clock::now();
        // std::chrono::duration<double, std::milli> elapsed = end - start;
        // std::cout << "recv execution time: " << elapsed.count() << " ms\n";
        if (frame.empty()) {
            usleep(2000);
            continue;
        }
        // cv::imwrite("image"+to_string(idx++)+".jpg", frame);
        std::cout << "Image " << idx++ <<" received !!" << std::endl;
    }
}

void sendServer(ServerManager& s, ModelManager& model){
    while (true){
        if (s.send_data_frame(model)) {
            cout << "send success!!" << endl;
            cout << endl;
        }else{
            // cout << "send fail!!" << endl;
            usleep(2000);
        }
    }
}

int main()
{
    ServerManager s;
    ModelManager model;
    model.initDetectModel();
    model.initClassifyModel();
    cout << endl;
    s.initialization(PORT);
    s.build_connect();
    s.acceptClient();

    thread recv_server(recvServer, std::ref(s), std::ref(model));
    thread send_server(sendServer, std::ref(s), std::ref(model));
    thread detect(&ModelManager::DetectImg, &model);
    thread classfy(&ModelManager::ClassifyImg, &model);
    detect.join();
    classfy.join();
    recv_server.join();
    send_server.join();
    return 0;
}


import json
import socket
import struct
import time
from multiprocessing import JoinableQueue
from threading import Thread

import os
from natsort import ns, natsorted

host = '192.168.0.2'  # '192.168.0.2' 'localhost'
port = 8080

def img_encode(img_path):
    img = cv2.imread(img_path)
    # img = cv2.resize(img, (500, 500), interpolation=cv2.INTER_CUBIC)
    img_param = [95]  # 图片压缩率0-100
    _, img = cv2.imencode('.jpg', img, img_param)
    img = img.tobytes()
    return img

def img_product(img_queue, path, path_mode='image'):
    if path_mode == 'image':
        image = img_encode(path)
        img_queue.put(image)
    elif path_mode == 'dir':
        dir_list = os.listdir(path)
        files = natsorted(dir_list, alg=ns.PATH)  # 顺序读取文件名
        for filename in files:
            img_path = path + '/' + filename
            image = img_encode(img_path)
            img_queue.put(image)
        img_queue.put('E')
    img_queue.join()

def server_consumer(img_queue):
    while True:
        start = int(round(time.time() * 1000))
        # 1. get img from queue
        img_obj = img_queue.get()
        img_queue.task_done()
        # get end signal
        if img_obj[0] == 'E':
        client.close()
        break
        # 2. send package(img_bytes_size, img_bytes)
        pack_size = struct.pack("i", len(img_obj))
        client.send(pack_size + img_obj)
        end = int(round(time.time() * 1000))

        data = client.recv(65536)
        json_str = data.decode('utf8', 'ignore').strip(b'\x00'.decode())
        results = json.loads(json_str)
        end = int(round(time.time() * 1000))
        end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        print('send and recv cost time: ', (end - start))
        print(results)

        if __name__ == '__main__':
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect((host, port))
        img_dir = 'data'
        one_img = './data/image.jpg'
        mode = 'dir'

        img_jq = JoinableQueue()
        producer = Thread(target=img_product, args=(img_jq, img_dir, mode,))
        consumer = Thread(target=server_consumer, args=(img_jq,))
        producer.daemon = True  # set daemon but not set join()

        producer.start()
        consumer.start()

    # producer.join() // 让生产者先关闭,防止close错误
        consumer.join()