最近有个需求,需要使用flinksql读写redis,由于官网上并没有redis的connector,在网上找了很久,开源的几个connector又没法满足要求,所有这里就自己动手实现了一个。已经适配了各个版本的flink,从flink1.12到flink1.15。
简单介绍一下功能吧:
@Before
public void init() {
/\*\*
设置当前属于测试模式,在这个测试模式下,当流表数据消费完成后程序会停止,方便测试,这个模式默认false
\*/
RedisOptions.IS\_TEST = true;
RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
List<String> lists = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
lists.add("{\\n" +
" \\"number\\": " + i + ",\\n" +
" \\"name\\": \\"学生" + i + "\\",\\n" +
" \\"school\\": \\"学校" + ((i % 3) + 1) +"\\",\\n" +
" \\"class\_id\\": " + ((i % 10) + 1) +"\\n" +
"}");
}
/\*\*
\* 初始化学生数据
\*/
for (int i = 0; i < 1; i++) {
redisOperator.rpush("students", lists.subList(1000 \* i, 1000 \* (i + 1)));
}
/\*\*
\* 初始化班级数据
\*/
for(int i = 0;i < 10;i++) {
redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");
}
/\*\*
\* 初始化学校班级数据
\*/
for(int j = 1;j < 4;j++) {
for (int i = 1; i < 11; i++) {
redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");
}
}
}
@Test
public void testBlpopSQL() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String source =
"CREATE TABLE students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'key'='students',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='BLPOP'\\n" +
" )";
String sink =
"CREATE TABLE sink\_students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='print'\\n" +
" )";
tEnv.executeSql(source);
tEnv.executeSql(sink);
String sql =
" insert into sink\_students select \* from students";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
}
@Before
public void init() {
/\*\*
设置当前属于测试模式,在这个测试模式下,当流表数据消费完成后程序会停止,方便测试,这个模式默认false
\*/
RedisOptions.IS\_TEST = true;
RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
List<String> lists = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
lists.add("{\\n" +
" \\"number\\": " + i + ",\\n" +
" \\"name\\": \\"学生" + i + "\\",\\n" +
" \\"school\\": \\"学校" + ((i % 3) + 1) +"\\",\\n" +
" \\"class\_id\\": " + ((i % 10) + 1) +"\\n" +
"}");
}
/\*\*
\* 初始化学生数据
\*/
for (int i = 0; i < 1; i++) {
redisOperator.rpush("students", lists.subList(1000 \* i, 1000 \* (i + 1)));
}
/\*\*
\* 初始化班级数据
\*/
for(int i = 0;i < 10;i++) {
redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");
}
/\*\*
\* 初始化学校班级数据
\*/
for(int j = 1;j < 4;j++) {
for (int i = 1; i < 11; i++) {
redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");
}
}
}
@Test
public void testGetSQL() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String source =
"CREATE TABLE students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" proctime as PROCTIME() \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'key'='students',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='BLPOP'\\n" +
" )";
/\*\*
这里需要注意的是,由于使用get命令,而且没有加format属性,所以维表只能有两个字段,多了也识别不到,
详细可以看源码里的注释
\*/
String daeamon =
"CREATE TABLE classes\\n" +
"(\\n" +
" class\_id BIGINT ,\\n" +
" class\_name string " +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'lookup.cache.max-rows'='1000',\\n" +
" 'lookup.cache.ttl'='3600',\\n" +
" 'lookup.cache.load-all'='true',\\n" +
" 'database'='0',\\n" +
" 'command'='GET'\\n" +
" )";
String sink =
"CREATE TABLE sink\_students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" class\_name string \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='print'\\n" +
" )";
tEnv.executeSql(source);
tEnv.executeSql(daeamon);
tEnv.executeSql(sink);
/\*\*
这里join的字段必须是GET命令的key
\*/
String sql =
" insert into sink\_students "
+ " select s.number,s.name,s.school,s.class\_id,d.class\_name from students s"
+ " left join classes for system\_time as of s.proctime as d on d.class\_id = s.class\_id";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
}
@Test
public void testHGetSQL() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String source =
"CREATE TABLE students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" proctime as PROCTIME() \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'key'='students',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='BLPOP'\\n" +
" )";
/\*\*
这里需要注意的是,由于使用hget命令,而且没有加format属性,所以维表只能有三个字段,多了也识别不到,
详细可以看源码里的注释
\*/
String daeamon =
"CREATE TABLE classes\\n" +
"(\\n" +
" school string, \\n" +
" class\_id BIGINT ,\\n" +
" class\_name string " +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'lookup.cache.max-rows'='1000',\\n" +
" 'lookup.cache.ttl'='3600',\\n" +
" 'lookup.cache.load-all'='true',\\n" +
" 'database'='0',\\n" +
" 'command'='HGET'\\n" +
" )";
String sink =
"CREATE TABLE sink\_students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" class\_name string \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='print'\\n" +
" )";
tEnv.executeSql(source);
tEnv.executeSql(daeamon);
tEnv.executeSql(sink);
/\*\*
这里需要注意的是,由于使用hget命令,这里join的参数两个参数顺序没有关系,真正执行hget命令哪个字段作为key,
哪个字段作为field只与维表定义的时候的字段顺序有关系
\*/
String sql =
" insert into sink\_students "
+ " select s.number,s.name,s.school,s.class\_id,d.class\_name from students s"
+ " left join classes for system\_time as of s.proctime as d on d.class\_id = s.class\_id and d.school = s.school";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
}
@Before
public void init() {
RedisOptions.IS\_TEST = true;
RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
List<String> lists = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
lists.add("{\\n" +
" \\"number\\": " + i + ",\\n" +
" \\"name\\": \\"学生" + i + "\\",\\n" +
" \\"school\\": \\"学校" + ((i % 3) + 1) +"\\",\\n" +
" \\"class\_id\\": " + ((i % 10) + 1) +"\\n" +
"}");
}
/\*\*
\* 初始化学生数据
\*/
for (int i = 0; i < 1; i++) {
redisOperator.rpush("students", lists.subList(1000 \* i, 1000 \* (i + 1)));
}
/\*\*
\* 初始化班级数据
\*/
for(int i = 0;i < 10;i++) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("class\_id",String.valueOf(i + 1));
jsonObject.put("class\_name","银河" + (i + 1) + "班");
jsonObject.put("remark","remark" + i);
redisOperator.set(String.valueOf(i + 1),jsonObject.toString());
}
/\*\*
\* 初始化学校班级数据
\*/
for(int j = 1;j < 4;j++) {
for (int i = 1; i < 11; i++) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("class\_id",String.valueOf(i));
jsonObject.put("class\_name","银河" + i + "班");
jsonObject.put("remark","remark" + i);
jsonObject.put("school","学校" + j);
redisOperator.hset("学校" + j, String.valueOf(i), jsonObject.toString());
}
}
}
@Test
public void testGetSQL() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String source =
"CREATE TABLE students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" proctime as PROCTIME() \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'key'='students',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='BLPOP'\\n" +
" )";
/\*\*
\* 这里测试的核心是维表有format=json配置项,有了format配置项后,字段个数不受限制,但是需要注意的是,作为get命令的key的字段
\* 一定要放在表申明的第一位,并且get命令的value的值使用format格式化后,比如是json格式,则json里一定要包含作为维表查询的
\* join on后面带的作为key的查询列,不然会报空指针异常
\*/
String daeamon =
"CREATE TABLE classes\\n" +
"(\\n" +
" class\_id BIGINT ,\\n" +
" class\_name string ,\\n " +
" remark string " +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'format'='json', \\n" +
" 'password'='123456',\\n" +
" 'lookup.cache.max-rows'='1000',\\n" +
" 'lookup.cache.ttl'='3600',\\n" +
" 'lookup.cache.load-all'='true',\\n" +
" 'database'='0',\\n" +
" 'command'='GET'\\n" +
" )";
String sink =
"CREATE TABLE sink\_students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" class\_name string, \\n" +
" remark string " +
") \\n" +
"WITH (\\n" +
" 'connector'='print'\\n" +
" )";
tEnv.executeSql(source);
tEnv.executeSql(daeamon);
tEnv.executeSql(sink);
String sql =
" insert into sink\_students "
+ " select s.number,s.name,s.school,s.class\_id,d.class\_name,d.remark from students s"
+ " left join classes for system\_time as of s.proctime as d on d.class\_id = s.class\_id";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
}
@Test
public void testHGetSQL() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String source =
"CREATE TABLE students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" proctime as PROCTIME() \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'key'='students',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='BLPOP'\\n" +
" )";
/\*\*
\* 这里测试的核心是维表有format=json配置项,有了format配置项后,字段个数不受限制,但是需要注意的是,作为hget命令的key的字段
\* 一定要放在表申明的第一位,field的字段一定要放在申明的第二位,并且hget命令的value的值使用format格式化后,比如是json格式, \* 则json里一定要包含作为维表查询的 join on后面带的作为key和field的查询列,不然会报空指针异常
\*/
String daeamon =
"CREATE TABLE classes\\n" +
"(\\n" +
" school string, \\n" +
" class\_id BIGINT ,\\n" +
" class\_name string, " +
" remark string " +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'format'='json', \\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'lookup.cache.max-rows'='1000',\\n" +
" 'lookup.cache.ttl'='3600',\\n" +
" 'lookup.cache.load-all'='true',\\n" +
" 'database'='0',\\n" +
" 'command'='HGET'\\n" +
" )";
String sink =
"CREATE TABLE sink\_students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" class\_name string, \\n" +
" remark string " +
") \\n" +
"WITH (\\n" +
" 'connector'='print'\\n" +
" )";
tEnv.executeSql(source);
tEnv.executeSql(daeamon);
tEnv.executeSql(sink);
String sql =
" insert into sink\_students "
+ " select s.number,s.name,s.school,s.class\_id,d.class\_name,d.remark from students s"
+ " left join classes for system\_time as of s.proctime as d on d.class\_id = s.class\_id and d.school = s.school";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
}
@Before
public void init() {
RedisOptions.IS\_TEST = true;
RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
List<String> lists = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
lists.add("{\\n" +
" \\"number\\": " + i + ",\\n" +
" \\"name\\": \\"学生" + i + "\\",\\n" +
" \\"school\\": \\"学校" + ((i % 3) + 1) +"\\",\\n" +
" \\"class\_id\\": " + ((i % 10) + 1) +"\\n" +
"}");
}
/\*\*
\* 初始化学生数据
\*/
for (int i = 0; i < 1; i++) {
redisOperator.rpush("students", lists.subList(1000 \* i, 1000 \* (i + 1)));
}
/\*\*
\* 初始化班级数据
\*/
for(int i = 0;i < 10;i++) {
redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");
}
/\*\*
\* 初始化学校班级数据
\*/
for(int j = 1;j < 4;j++) {
for (int i = 1; i < 11; i++) {
redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");
}
}
}
@Test
public void testLPushSQL() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String source =
"CREATE TABLE students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" proctime as PROCTIME() \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'key'='students',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='BLPOP'\\n" +
" )";
String daeamon =
"CREATE TABLE classes\\n" +
"(\\n" +
" school string, \\n" +
" class\_id BIGINT ,\\n" +
" class\_name string " +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'lookup.cache.max-rows'='1000',\\n" +
" 'lookup.cache.ttl'='3600',\\n" +
" 'lookup.cache.load-all'='true',\\n" +
" 'database'='0',\\n" +
" 'command'='HGET'\\n" +
" )";
/\*\*
\* 1、这里因为command是LPUSH,所以不需要primary key(number) not enforced, 因为这种命令只支持INSERT语义
\* 2、并行度配置项sink.parallelism没有配置,默认为核心数
\*/
String sink =
"CREATE TABLE sink\_students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" class\_name string \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'key'='sink\_students\_list',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='LPUSH'\\n" +
" )";
tEnv.executeSql(source);
tEnv.executeSql(daeamon);
tEnv.executeSql(sink);
String sql =
" insert into sink\_students "
+ " select s.number,s.name,s.school,s.class\_id,d.class\_name from students s"
+ " left join classes for system\_time as of s.proctime as d on d.class\_id = s.class\_id and d.school = s.school";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
}
@Test
public void testSet() throws Exception {
long start = System.currentTimeMillis();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String source =
"CREATE TABLE students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" proctime as PROCTIME() \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'key'='students',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='BLPOP'\\n" +
" )";
String daeamon =
"CREATE TABLE classes\\n" +
"(\\n" +
" school string, \\n" +
" class\_id BIGINT ,\\n" +
" class\_name string " +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'lookup.cache.max-rows'='1000',\\n" +
" 'lookup.cache.ttl'='3600',\\n" +
" 'lookup.cache.load-all'='true',\\n" +
" 'database'='0',\\n" +
" 'command'='HGET'\\n" +
" )";
/\*\*
\* 1、这里因为command是SET,所以需要一个key,这里key就是使用主键,多个就用下划线拼接起来,
\* 2、并行度配置项sink.parallelism没有配置,默认为核心数
\*/
String sink =
"CREATE TABLE sink\_students\\n" +
"(\\n" +
" school string, \\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" class\_id BIGINT, \\n" +
" class\_name string, \\n" +
" primary key(school,number) not enforced" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='SET'\\n" +
" )";
tEnv.executeSql(source);
tEnv.executeSql(daeamon);
tEnv.executeSql(sink);
String sql =
" insert into sink\_students "
+ " select s.school,s.number,s.name,s.class\_id,d.class\_name from students s"
+ " left join classes for system\_time as of s.proctime as d on d.class\_id = s.class\_id and d.school = s.school";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) + "ms");
}
@Test
public void testHSet() throws Exception {
long start = System.currentTimeMillis();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String source =
"CREATE TABLE students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" proctime as PROCTIME() \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'key'='students',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='BLPOP'\\n" +
" )";
String daeamon =
"CREATE TABLE classes\\n" +
"(\\n" +
" school string, \\n" +
" class\_id BIGINT ,\\n" +
" class\_name string " +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'lookup.cache.max-rows'='1000',\\n" +
" 'lookup.cache.ttl'='3600',\\n" +
" 'lookup.cache.load-all'='true',\\n" +
" 'database'='0',\\n" +
" 'command'='HGET'\\n" +
" )";
/\*\*
\* 1、这里因为command是HSET,所以需要一个key和一个field,这里是按照表申明的顺序,第一个作为key,
\* 第二个作为field,由于需要更新,也需要一个主键,这里最好把前两个字段一起作为主键
\* 2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒,默认为-1表示长期保存
\*/
String sink =
"CREATE TABLE sink\_students\\n" +
"(\\n" +
" school string, \\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" class\_id BIGINT, \\n" +
" class\_name string, \\n" +
" primary key(school,number) not enforced" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'sink.parallelism' = '16',\\n" +
" 'sink.key.ttl' = '300',\\n" +
" 'command'='HSET'\\n" +
" )";
tEnv.executeSql(source);
tEnv.executeSql(daeamon);
tEnv.executeSql(sink);
String sql =
" insert into sink\_students "
+ " select s.school,s.number,s.name,s.class\_id,d.class\_name from students s"
+ " left join classes for system\_time as of s.proctime as d on d.class\_id = s.class\_id and d.school = s.school";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) + "ms");
}
@Test
public void testHSetWithKey() throws Exception {
long start = System.currentTimeMillis();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String source =
"CREATE TABLE students\\n" +
"(\\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" school string, \\n" +
" class\_id BIGINT, \\n" +
" proctime as PROCTIME() \\n" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'key'='students',\\n" +
" 'format'='json',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'command'='BLPOP'\\n" +
" )";
String daeamon =
"CREATE TABLE classes\\n" +
"(\\n" +
" school string, \\n" +
" class\_id BIGINT ,\\n" +
" class\_name string " +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'lookup.cache.max-rows'='1000',\\n" +
" 'lookup.cache.ttl'='3600',\\n" +
" 'lookup.cache.load-all'='true',\\n" +
" 'database'='0',\\n" +
" 'command'='HGET'\\n" +
" )";
/\*\*
\* 1、这里因为command是HSET,所以需要一个key和一个field,这里配置项指定了key,那么主键拼接就作为field,
\* 使用hset保存到redis
\* 2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒,默认-1表示长期保存
\*/
String sink =
"CREATE TABLE sink\_students\\n" +
"(\\n" +
" school string, \\n" +
" number BIGINT ,\\n" +
" name string,\\n" +
" class\_id BIGINT, \\n" +
" class\_name string, \\n" +
" primary key(number) not enforced" +
") \\n" +
"WITH (\\n" +
" 'connector'='redis',\\n" +
" 'host'='10.201.0.33', \\n" +
" 'port'='6379',\\n" +
" 'redis-mode'='single', \\n" +
" 'password'='123456',\\n" +
" 'database'='0',\\n" +
" 'format'='json',\\n" +
" 'key'='sink\_students\_hset',\\n" +
" 'batch-fetch-rows'='1000',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true',\\n" +
" 'sink.parallelism' = '16',\\n" +
" 'sink.key.ttl' = '300',\\n" +
" 'command'='HSET'\\n" +
" )";
tEnv.executeSql(source);
tEnv.executeSql(daeamon);
tEnv.executeSql(sink);
String sql =
" insert into sink\_students "
+ " select s.school,s.number,s.name,s.class\_id,d.class\_name from students s"
+ " left join classes for system\_time as of s.proctime as d on d.class\_id = s.class\_id and d.school = s.school";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) + "ms");
}
配置项
描述
host
redis的host
port
redis的port
password
redis的password
cluster-nodes
redis的集群节点,ip和端口之间用英文冒号分隔,多个ip端口用英文逗号分割
master.name
redis的sentinel模式的master节点的名称
sentinels.info
redis的sentinel模式的info信息
sentinels.password
redis的sentinel模式的密码
database
redis的database,一般是0~15
command
redis的命令,作为流表时支持BLPOP、BRPOP、LPOP、RPOP、SPOP;作为维表时支持GET、HGET;作为sink表时支持LPUSH、RPUSH、SADD、SET、HSET
redis-mode
redis的部署模式,single、cluster、sentinel
key
redis需要访问的key,比如数据是以某个固定的key存放在redis里,值是一个list;redis执行lpush、rpush、sadd、hset等sink使用的命令时的key;
timeout
连接redis的超时时间,单位毫秒
max-total
连接redis的连接池的最大连接数
max-idle
连接redis的连接池的最大空闲数
min-idle
连接redis的连接池的最小空闲数
format
格式化数据格式,如json、csv
batch-fetch-rows
像LPOP、BLPOP、RPOP、BRPOP这种命令每次从redis拿到数据的条数
lookup.cache.max-rows
作为维表lookup模式,缓存在内存中的数据的最大条数
lookup.cache.ttl
作为维表lookup模式,缓存在内存中的数据的ttl超时时间,单位秒
lookup.max-retries
作为维表lookup模式,查找数据的失败重试次数
lookup.cache.load-all
作为维表lookup模式,查找数据是否加载所有,主要是针对hget命令,如:HGET KEY_NAME FIELD_NAME;是否根据key查出所有field的值,这里可以根据实际hash表的大小决定是否要查询所有出来缓存起来
sink.max-retries
redis作为sink源时,最大重试次数
sink.parallelism
redis作为sink源时,sink的并行数,默认并行度为核心数
sink.key.ttl
redis作为sink源时,sink的数据保存在redis的ttl超时时间,单位秒,默认为-1表示长期保存
lookup.max-retries
作为维表lookup模式,查找数据的失败重试次数
手机扫一扫
移动阅读更方便
你可能感兴趣的文章