SpringCloud个人笔记-04-Stream初体验
阅读原文时间:2023年07月08日阅读:5
  • Spring Cloud Stream 是一个构建消息驱动微服务的框架

    • 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。
    • 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。
  • ### 为什么需要SpringCloud Stream消息驱动呢?

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,

后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

  • *   Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。)通道与外界交流。

            通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可


  • ### 1.基础消息发送消费


http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0

<groupId>com.huarui</groupId>  
<artifactId>sb\_cloud\_stream</artifactId>  
<version>0.0.1-SNAPSHOT</version>  
<name>sb\_cloud\_stream</name>  
<description>Demo project for Spring Boot</description>

<parent>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-parent</artifactId>  
    <version>1.5.13.RELEASE</version>  
    <relativePath/> <!-- lookup parent from repository -->  
</parent>

<properties>  
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>  
    <java.version>1.8</java.version>  
    <spring-cloud.version>Edgware.SR3</spring-cloud.version>  
</properties>

<dependencies>

    <!-- rabbit -->  
    <dependency>  
        <groupId>org.springframework.cloud</groupId>  
        <artifactId>spring-cloud-starter-bus-amqp</artifactId>  
    </dependency>

    <!-- stream -->  
    <dependency>  
        <groupId>org.springframework.cloud</groupId>  
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>  
    </dependency>

    <!-- eureka client -->  
    <dependency>  
        <groupId>org.springframework.cloud</groupId>  
        <artifactId>spring-cloud-starter-eureka</artifactId>  
    </dependency>

    <dependency>  
        <groupId>org.springframework.boot</groupId>  
        <artifactId>spring-boot-starter-test</artifactId>  
        <scope>test</scope>  
    </dependency>  
</dependencies>

<dependencyManagement>  
    <dependencies>  
        <dependency>  
            <groupId>org.springframework.cloud</groupId>  
            <artifactId>spring-cloud-dependencies</artifactId>  
            <version>${spring-cloud.version}</version>  
            <type>pom</type>  
            <scope>import</scope>  
        </dependency>  
    </dependencies>  
</dependencyManagement>

<build>  
    <plugins>  
        <plugin>  
            <groupId>org.springframework.boot</groupId>  
            <artifactId>spring-boot-maven-plugin</artifactId>  
        </plugin>  
    </plugins>  
</build>

pom.xml

1 spring:
2 application:
3 name: sb_cloud_config_client
4 # #RebbitMq
5 rabbitmq:
6 host: 39.108.85.204
7 port: 5672
8 username: guest
9 password: guest
10
11
12 eureka:
13 client:
14 serviceUrl:
15 defaultZone: http://39.108.85.204:8761/eureka/
16 instance:
17 #注册时使用ip而不是主机名
18 prefer-ip-address: true
19 #指定此实例的ip
20 #ip-address: ip
21 server:
22 port: 9004

application.yml

public class Girl implements Serializable {

private String id;  
private String name;  
private Date date;

public Girl(){

}

public Girl(String id, String name, Date date) {  
    this.id = id;  
    this.name = name;  
    this.date = date;  
}

public String getId() {  
    return id;  
}

public void setId(String id) {  
    this.id = id;  
}

public String getName() {  
    return name;  
}

public void setName(String name) {  
    this.name = name;  
}

public Date getDate() {  
    return date;  
}

public void setDate(Date date) {  
    this.date = date;  
}

@Override  
public String toString() {  
    return "Girl{" +  
            "id='" + id + '\\'' +  
            ", name='" + name + '\\'' +  
            ", date=" + date +  
            '}';  
}  

}

Girl.java

package com.huarui.util;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class JsonUtils {

private static Logger log = LoggerFactory.getLogger(JsonUtils.class);  
private static JsonFactory jsonfactory = new JsonFactory();  
private static ObjectMapper mapper = new ObjectMapper(jsonfactory);

public static <T> T parseObject(String json,Class<T> clzz) {  
    //设置JSON时间格式  
    SimpleDateFormat myDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
    mapper.setDateFormat(myDateFormat);  
    try {  
        return mapper.readValue(json, clzz);  
    } catch (JsonParseException e) {  
        e.printStackTrace();  
    } catch (JsonMappingException e) {  
        e.printStackTrace();  
    } catch (IOException e) {  
        e.printStackTrace();  
    }  
    return null;  
}

/\*\*  
 \* json转map  
 \*  
 \* @param json  
 \* @return  
 \*/  
public static Map<String,Object> parseMap(String json){  
    TypeReference<HashMap<String,Object>> typeRef = new TypeReference<HashMap<String,Object>>() {};  
    try {  
        return mapper.readValue(json, typeRef);  
    } catch (JsonParseException e) {  
        log.error("字符串转json出错!"+json, e);  
    } catch (JsonMappingException e) {  
        log.error("json映射map出错!"+json, e);  
    } catch (IOException e) {  
        log.error("json转map流错误!"+json, e);  
    }  
    return null;  
}

public static <T> List<T> parseList(String json,Class<?> clazz){  
    TypeFactory t = TypeFactory.defaultInstance();  
    try {  
        List<T> list = mapper.readValue(json,t.constructCollectionType(ArrayList.class,clazz));  
        return list;  
    } catch (JsonParseException e) {  
        e.printStackTrace();  
    } catch (JsonMappingException e) {  
        e.printStackTrace();  
    } catch (IOException e) {  
        e.printStackTrace();  
    }  
    return null;  
}

/\*\*  
 \* 对像转json字符串  
 \*  
 \* @param obj  
 \* @return  
 \*/  
public static String parseString(Object obj){  
    String result = null;  
    try {  
        SimpleDateFormat myDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
        mapper.setDateFormat(myDateFormat);  
        result = mapper.writeValueAsString(obj);  
    } catch (JsonParseException e) {  
        log.error("字符串转json出错!", e);  
    } catch (JsonMappingException e) {  
        log.error("json映射map出错!", e);  
    } catch (IOException e) {  
        log.error("json转map流错误!", e);  
    }  
    return result;  
}  

}

JsonUtils.java

package com.huarui.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
* 自定义input output
*/
public interface StreamClient {

public static final String INPUT = "myinput";

public static final String OUTPUT = "myoutput";

public static final String INPUTTO = "myinputto";

public static final String OUTPUTTO = "myoutputto";

/\*\*  
 \* 相当于消费者consumer,它是从队列中接收消息的  
 \*/  
@Input(INPUT)  
SubscribableChannel input();

/\*\*  
 \* 相当于生产者producer,它是从队列中发送消息的  
 \*/  
@Output(OUTPUT)  
MessageChannel output();

}

@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {

/\*\*  
 \* 基本 INPUT  
 \* @param girlStr  
 \*/  
@StreamListener(StreamClient.INPUT)  
public void process(String girlStr){  
    Girl girl = JsonUtils.parseObject(girlStr, Girl.class);  
    System.out.println("process 收到: "+girl);  
}  

}

@RestController
public class SendMessageController {

@Autowired  
private StreamClient streamClient;

@GetMapping("/sendGirl/{name}")  
public String sendGirl(@PathVariable String name){  
    Girl girl = new Girl(UUID.randomUUID().toString(),name,new Date());  
    streamClient.output().send(MessageBuilder.withPayload(girl).build());  
    return name+" send ok";  
}  

}

  • ### 避免启动项目时找不到该队列,手动创建 myinput队列

访问  http://localhost:9004/sendGirl/huahua 可见消息发送 并被消费,控制台打印的消息就可以证明

  • ### 2.消息分组 一条消息只让组内一个实例消费

spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zon: GMT+8
application:
name: sb_cloud_config_client

#RebbitMq

rabbitmq:
host: 39.108.85.204
port: 5672
username: guest
password: guest
cloud:
stream:
bindings:
myoutput: # 自定义output
content-type: application/json #消息发送的格式 发送端指定即可
destination: sb-cloud-stream
myinput: # 自定义input
destination: sb-cloud-stream
group: girlfriend #分组 同一组的消息只有一个实例接收

yml配置文件中添加group 即可实现。

  • ### 3.接收消息后  并回馈消息

spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zon: GMT+8
application:
name: sb_cloud_config_client

#RebbitMq

rabbitmq:
host: 39.108.85.204
port: 5672
username: guest
password: guest
cloud:
stream:
bindings:
myoutput: # 自定义output
content-type: application/json #消息发送的格式 发送端指定即可
destination: sb-cloud-stream
myinput: # 自定义input
destination: sb-cloud-stream
group: girlfriend #分组 同一组的消息只有一个实例接收
myoutputto: # 自定义output (回馈使用)
content-type: application/json #消息发送的格式 发送端指定即可
destination: sb-cloud-stream-to
myinputto: # 自定义input(回馈使用)
destination: sb-cloud-stream-to

public interface StreamClient {

public static final String INPUT = "myinput";

public static final String OUTPUT = "myoutput";

public static final String INPUTTO = "myinputto";

public static final String OUTPUTTO = "myoutputto";

/\*\*  
 \* 相当于消费者consumer,它是从队列中接收消息的  
 \*/  
@Input(INPUT)  
SubscribableChannel input();

/\*\*  
 \* 相当于生产者producer,它是从队列中发送消息的  
 \*/  
@Output(OUTPUT)  
MessageChannel output();

/\*\*  
 \* 相当于消费者consumer,它是从队列中接收消息的 (反馈使用)  
 \*/  
@Input(INPUTTO)  
SubscribableChannel inputto();

/\*\*  
 \* 相当于生产者producer,它是从队列中发送消息的 (反馈使用)  
 \*/  
@Output(OUTPUTTO)  
MessageChannel outputto();

}

package com.huarui.message;

import com.huarui.entity.Girl;
import com.huarui.util.JsonUtils;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
* Created by lihui on 2019/2/28.
*/

@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {

/\*\*  
 \* 基本 INPUT  
 \* @param girlStr  
 \*/  
@StreamListener(StreamClient.INPUT)  
public void process(String girlStr){  
    Girl girl = JsonUtils.parseObject(girlStr, Girl.class);  
    System.out.println("process 收到: "+girl);  
}  

/\*\*  
 \* 我们接收到消息后,给别人一个反馈  
 \* SpringCloud stream 给我们提供了一个SendTo注解可以帮我们干这些事情  
 \* @param girlStr  
 \* @return  
 \*/  
@StreamListener(StreamClient.INPUTTO)  
@SendTo(StreamClient.OUTPUTTO)  
public String processTo(String girlStr){  
    Girl girl = JsonUtils.parseObject(girlStr, Girl.class);  
    System.out.println("收到: "+girl);  
    return JsonUtils.parseString(girl);  
}

@StreamListener(StreamClient.OUTPUTTO)  
public void process2(String girlStr){  
    Girl girl = JsonUtils.parseObject(girlStr, Girl.class);  
    System.out.println("ta 已收到: "+girl);  
}

}

@SendTo 注解就能实现消息接收后回馈消息, return 返回值就是回馈的消息

访问  http://localhost:9004/sendGirlTo/huahua 可见消息被消费后 并回馈了一条消息,控制台打印的消息就可以证明


本项目地址: https://github.com/youxiu326/sb_cloud_stream.git

参考博客: https://i.cnblogs.com/EditPosts.aspx?postid=10464571&update=1