SparkStreaming和Drools结合的HelloWord版
阅读原文时间:2023年07月11日阅读:1

关于sparkStreaming的测试Drools框架结合版

package com.dinpay.bdp.rcp.service;

import java.math.BigDecimal;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.kie.api.KieServices;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;

import com.dinpay.bdp.rcp.metaq.MetaQReceiver;
import com.dinpay.bdp.rcp.streaming.StreamingUtil;
import com.dinpay.bdp.rcp.util.CodisUtil;
import com.dinpay.bdp.rcp.util.Constant;
import com.dinpay.dpp.rcp.po.Order;

import redis.clients.jedis.Jedis;
import scala.Tuple2;

/**
* 同卡号单日最大交易金额测试
* @author ll-t150
*
*/
public class SparkDroolsTest {

public static Logger logger = Logger.getLogger(SparkDroolsTest.class);  
public static final DateFormat df = new SimpleDateFormat("yyyyMMdd");

 public static void main(String\[\] args) {  
     String zkConnect=Constant.METAZK;  
     String zkRoot="/meta";  
     String topic=Constant.ORDERTOPIC;  
     String group=Constant.STREAMGROUP;  
     //屏蔽日志  
     Logger.getLogger("org.apache.spark").setLevel(Level.OFF);  
     logger.info("metaq configuration:"+zkConnect+"--"+topic+"--"+group);  
     SparkConf sparkConf = new SparkConf().setAppName("SparkDroolsTest").setMaster("local\[2\]");  
     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));  
     //从metaq取消息  
     JavaReceiverInputDStream<Order> lines = ssc.receiverStream(new MetaQReceiver(zkConnect,zkRoot,topic,group));

     JavaDStream<Order> words = lines.flatMap(new FlatMapFunction<Order, Order>() {  
        @Override  
        public Iterable<Order> call(Order order) throws Exception {  
            return Arrays.asList(new Order\[\]{order});  
        }  
    });

    //同卡号单日交易最大次数 统计包括成功和未成功的订单  
    JavaPairDStream<String, Integer> cardCntPairs = getCardJavaPair(words);  
    save2Codis(cardCntPairs);  
    ssc.start();  
    ssc.awaitTermination();  
    ssc.close();  
 }

 @SuppressWarnings({ "unchecked", "serial" })  
    public static <T> JavaPairDStream<String, T>  getCardJavaPair(JavaDStream<Order> words){  
         JavaPairDStream<String, T> pairs = null;  
                 //次数统计  
                 pairs = (JavaPairDStream<String, T>) words.mapToPair(new PairFunction<Order, String, Integer>() {  
                    @Override  
                    public Tuple2<String, Integer> call(Order order) {  
                        Jedis jedis = CodisUtil.getJedisPool().getResource();  
                        String cardCntkey = order.getSystemId()+"\_CNT\_"+order.getPayerCardNo()+"\_"+df.format(new Date());  
                        //拼接key,先到codis里面查找对应的key是否存在,若存在就直接取对应的值,然后取值加1  
                        String value = jedis.get(cardCntkey);  
                        if (StringUtils.isEmpty(value)) {  
                            return new Tuple2<String, Integer>(cardCntkey, 1);  
                        } else {  
                            return new Tuple2<String, Integer>(cardCntkey, Integer.parseInt(value) + 1);  
                        }  
                    }  
                });  
                return pairs;  
         }

     /\*\*  
     \* 将计算出的数据保存到codis中  
     \* @param pair  
     \*/  
    @SuppressWarnings("serial")  
    public static <T> void save2Codis(JavaPairDStream<String, T> pair) {  
        pair.foreachRDD(new VoidFunction2<JavaPairRDD<String,T>,Time>() {  
            @Override  
            public void call(JavaPairRDD<String, T> rdd, Time time) throws Exception {  
                rdd.foreach(new VoidFunction<Tuple2<String,T>>() {  
                    @Override  
                    public void call(Tuple2<String, T> tp) throws Exception {  
                            Jedis jedis = CodisUtil.getJedisPool().getResource();  
                            jedis.set(tp.\_1(), String.valueOf(tp.\_2()));  
                            logger.info(tp.\_1() + ">>>" + tp.\_2()+",保存到Codis完成!");  
                            KieServices kieServices = KieServices.Factory.get();  
                            KieContainer kieContainer = kieServices.getKieClasspathContainer();  
                            KieSession kieSession = kieContainer.newKieSession("helloworld");  
                            ChannAmount objectChannel = new ChannAmount();  
                            objectChannel.setAmount(Integer.parseInt(String.valueOf(tp.\_2())));  
                            objectChannel.setChannel(tp.\_1());  
                            kieSession.insert(objectChannel);  
                            kieSession.fireAllRules();  
                            if(jedis !=null){  
                                jedis.close();  
                            }  
                    }  
                });  
            }  
        });  
    }

}

关于配置文件的设置

kmodule.xml文件


riskMonitor.drl内容

package rules;

import com.dinpay.bdp.rcp.service.ChannAmount;
//其中m为对象objectChannel 的引用
rule "channel"
when
ChannAmount(amount>2)
then
System.out.println("Drools规则实现:该渠道最近5分钟交易金额超过2次 ");
end

测试OK!