关于sparkStreaming的测试Drools框架结合版
package com.dinpay.bdp.rcp.service;
import java.math.BigDecimal;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.kie.api.KieServices;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import com.dinpay.bdp.rcp.metaq.MetaQReceiver;
import com.dinpay.bdp.rcp.streaming.StreamingUtil;
import com.dinpay.bdp.rcp.util.CodisUtil;
import com.dinpay.bdp.rcp.util.Constant;
import com.dinpay.dpp.rcp.po.Order;
import redis.clients.jedis.Jedis;
import scala.Tuple2;
/**
* 同卡号单日最大交易金额测试
* @author ll-t150
*
*/
public class SparkDroolsTest {
public static Logger logger = Logger.getLogger(SparkDroolsTest.class);
public static final DateFormat df = new SimpleDateFormat("yyyyMMdd");
public static void main(String\[\] args) {
String zkConnect=Constant.METAZK;
String zkRoot="/meta";
String topic=Constant.ORDERTOPIC;
String group=Constant.STREAMGROUP;
//屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
logger.info("metaq configuration:"+zkConnect+"--"+topic+"--"+group);
SparkConf sparkConf = new SparkConf().setAppName("SparkDroolsTest").setMaster("local\[2\]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
//从metaq取消息
JavaReceiverInputDStream<Order> lines = ssc.receiverStream(new MetaQReceiver(zkConnect,zkRoot,topic,group));
JavaDStream<Order> words = lines.flatMap(new FlatMapFunction<Order, Order>() {
@Override
public Iterable<Order> call(Order order) throws Exception {
return Arrays.asList(new Order\[\]{order});
}
});
//同卡号单日交易最大次数 统计包括成功和未成功的订单
JavaPairDStream<String, Integer> cardCntPairs = getCardJavaPair(words);
save2Codis(cardCntPairs);
ssc.start();
ssc.awaitTermination();
ssc.close();
}
@SuppressWarnings({ "unchecked", "serial" })
public static <T> JavaPairDStream<String, T> getCardJavaPair(JavaDStream<Order> words){
JavaPairDStream<String, T> pairs = null;
//次数统计
pairs = (JavaPairDStream<String, T>) words.mapToPair(new PairFunction<Order, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Order order) {
Jedis jedis = CodisUtil.getJedisPool().getResource();
String cardCntkey = order.getSystemId()+"\_CNT\_"+order.getPayerCardNo()+"\_"+df.format(new Date());
//拼接key,先到codis里面查找对应的key是否存在,若存在就直接取对应的值,然后取值加1
String value = jedis.get(cardCntkey);
if (StringUtils.isEmpty(value)) {
return new Tuple2<String, Integer>(cardCntkey, 1);
} else {
return new Tuple2<String, Integer>(cardCntkey, Integer.parseInt(value) + 1);
}
}
});
return pairs;
}
/\*\*
\* 将计算出的数据保存到codis中
\* @param pair
\*/
@SuppressWarnings("serial")
public static <T> void save2Codis(JavaPairDStream<String, T> pair) {
pair.foreachRDD(new VoidFunction2<JavaPairRDD<String,T>,Time>() {
@Override
public void call(JavaPairRDD<String, T> rdd, Time time) throws Exception {
rdd.foreach(new VoidFunction<Tuple2<String,T>>() {
@Override
public void call(Tuple2<String, T> tp) throws Exception {
Jedis jedis = CodisUtil.getJedisPool().getResource();
jedis.set(tp.\_1(), String.valueOf(tp.\_2()));
logger.info(tp.\_1() + ">>>" + tp.\_2()+",保存到Codis完成!");
KieServices kieServices = KieServices.Factory.get();
KieContainer kieContainer = kieServices.getKieClasspathContainer();
KieSession kieSession = kieContainer.newKieSession("helloworld");
ChannAmount objectChannel = new ChannAmount();
objectChannel.setAmount(Integer.parseInt(String.valueOf(tp.\_2())));
objectChannel.setChannel(tp.\_1());
kieSession.insert(objectChannel);
kieSession.fireAllRules();
if(jedis !=null){
jedis.close();
}
}
});
}
});
}
}
关于配置文件的设置
kmodule.xml文件
riskMonitor.drl内容
package rules;
import com.dinpay.bdp.rcp.service.ChannAmount;
//其中m为对象objectChannel 的引用
rule "channel"
when
ChannAmount(amount>2)
then
System.out.println("Drools规则实现:该渠道最近5分钟交易金额超过2次 ");
end
测试OK!
手机扫一扫
移动阅读更方便
你可能感兴趣的文章