Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。
<dependency> <groupId>org.apache.doris</groupId> <artifactId>flink-doris-connector-1.13_2.12</artifactId> <version>1.0.3</version></dependency>x-- 切测试库use test_db;
-- 创建测试表flinktestCREATE TABLE flinktest( siteid INT DEFAULT '10', citycode SMALLINT, username VARCHAR(32) DEFAULT '', pv BIGINT SUM DEFAULT '0')AGGREGATE KEY(siteid, citycode, username)DISTRIBUTED BY HASH(siteid) BUCKETS 10PROPERTIES("replication_num" = "1");
-- 插入样例数据insert into flinktest values(1,1,'jim',2),(2,1,'grace',2),(3,2,'tom',2),(4,3,'bush',3),(5,3,'helen',3);
-- 查看表数据情况select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+| 1 | 1 | jim | 2 || 5 | 3 | helen | 3 || 4 | 3 | bush | 3 || 3 | 2 | tom | 2 || 2 | 1 | grace | 2 |+--------+----------+----------+------+| Doris Type | Flink Type |
|---|---|
| NULL_TYPE | NULL |
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DATE | DATE |
| DATETIME | TIMESTAMP |
| DECIMAL | DECIMAL |
| CHAR | STRING |
| LARGEINT | STRING |
| VARCHAR | STRING |
| DECIMALV2 | DECIMAL |
| TIME | DOUBLE |
| HLL | Unsupported datatype |
代码示例:
xxxxxxxxxxpackage com.zenitera.bigdata.doris;
import org.apache.doris.flink.cfg.DorisStreamOptions;import org.apache.doris.flink.datastream.DorisSourceFunction;import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class Flink_stream_read_doris { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
Properties props = new Properties(); props.setProperty("fenodes", "hdt-dmcp-ops01:8130"); props.setProperty("username", "root"); props.setProperty("password", "123456"); props.setProperty("table.identifier", "test_db.flinktest");
env .addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema())) .print();
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
}}
/* 代码控制台输出:[4, 3, bush, 3][2, 1, grace, 2][1, 1, jim, 2][5, 3, helen, 3][3, 2, tom, 2] */
Flink 读写 Doris 数据主要有两种方式
代码示例:
xxxxxxxxxxpackage com.zenitera.bigdata.doris;
import org.apache.doris.flink.cfg.DorisExecutionOptions;import org.apache.doris.flink.cfg.DorisOptions;import org.apache.doris.flink.cfg.DorisSink;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/** * 使用 Flink 将 JSON 数据 写到Doris数据库 */public class Flink_stream_write_doris_json { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
Properties pro = new Properties(); pro.setProperty("format", "json"); pro.setProperty("strip_outer_array", "true"); env .fromElements("{\"siteid\":\"10\", \"citycode\": \"1001\",\"username\": \"ww\",\"pv\":\"100\"}") .addSink(DorisSink.sink( new DorisExecutionOptions.Builder() .setBatchIntervalMs(2000L) .setEnableDelete(false) .setMaxRetries(3) .setStreamLoadProp(pro) .build(), new DorisOptions.Builder() .setFenodes("hdt-dmcp-ops01:8130") .setUsername("root") .setPassword("123456") .setTableIdentifier("test_db.flinktest") .build()) );
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
}}
/* 代码执行前: 5 rows select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+| 1 | 1 | jim | 2 || 5 | 3 | helen | 3 || 4 | 3 | bush | 3 || 3 | 2 | tom | 2 || 2 | 1 | grace | 2 |+--------+----------+----------+------+
代码执行后: 6 rows select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+| 2 | 1 | grace | 2 || 3 | 2 | tom | 2 || 5 | 3 | helen | 3 || 1 | 1 | jim | 2 || 10 | 1001 | ww | 100 || 4 | 3 | bush | 3 |+--------+----------+----------+------+ */
代码示例:
xxxxxxxxxxpackage com.zenitera.bigdata.doris;
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.apache.doris.flink.cfg.DorisExecutionOptions;import org.apache.doris.flink.cfg.DorisOptions;import org.apache.doris.flink.cfg.DorisSink;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.data.GenericRowData;import org.apache.flink.table.data.StringData;import org.apache.flink.table.types.logical.*;
public class Flink_stream_write_doris_rowdata { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(), new BigIntType()}; String[] fields = {"siteid", "citycode", "username", "pv"};
env .fromElements("{\"siteid\":\"100\", \"citycode\": \"1002\",\"username\": \"wang\",\"pv\":\"100\"}")
.map(json -> { JSONObject obj = JSON.parseObject(json); GenericRowData rowData = new GenericRowData(4); rowData.setField(0, obj.getIntValue("siteid")); rowData.setField(1, obj.getShortValue("citycode")); rowData.setField(2, StringData.fromString(obj.getString("username"))); rowData.setField(3, obj.getLongValue("pv")); return rowData;
})
.addSink(DorisSink.sink( fields, types, new DorisExecutionOptions.Builder() .setBatchIntervalMs(2000L) .setEnableDelete(false) .setMaxRetries(3) .build(), new DorisOptions.Builder() .setFenodes("hdt-dmcp-ops01:8130") .setUsername("root") .setPassword("123456") .setTableIdentifier("test_db.flinktest") .build()) );
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
}}
/* 代码执行前: 6 rows select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+| 2 | 1 | grace | 2 || 3 | 2 | tom | 2 || 5 | 3 | helen | 3 || 1 | 1 | jim | 2 || 10 | 1001 | ww | 100 || 4 | 3 | bush | 3 |+--------+----------+----------+------+
代码执行后: 7 rows select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+| 1 | 1 | jim | 2 || 2 | 1 | grace | 2 || 3 | 2 | tom | 2 || 5 | 3 | helen | 3 || 10 | 1001 | ww | 100 || 100 | 1002 | wang | 100 || 4 | 3 | bush | 3 |+--------+----------+----------+------+ */
Doris测试表:
xxxxxxxxxxuse test_db;
truncate table flinktest;
insert into flinktest values(1,1,'aaa',1),(2,2,'bbb',2),(3,3,'ccc',3);
select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+| 2 | 2 | bbb | 2 || 1 | 1 | aaa | 1 || 3 | 3 | ccc | 3 |+--------+----------+----------+------+3 rows in set (0.01 sec)
Flink-SQL代码示例:
xxxxxxxxxxpackage com.zenitera.bigdata.doris;
import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Flink_SQL_doris { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("create table flink_0518(" + " siteid int, " + " citycode int, " + " username string, " + " pv bigint " + ")with(" + " 'connector' = 'doris', " + " 'fenodes' = 'hdt-dmcp-ops01:8130', " + " 'table.identifier' = 'test_db.flinktest', " + " 'username' = 'root', " + " 'password' = '123456' " + ")");
tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) ");
}
public static class Flink_0518 { private Integer siteid; private Integer citycode; private String username; private Long pv; }}执行代码,执行完成后查看Doris对应表数据进行验证:
xxxxxxxxxxselect * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+| 3 | 3 | ccc | 3 || 2 | 2 | bbb | 2 || 1 | 1 | aaa | 1 || 4 | 4 | wangting | 4 |+--------+----------+----------+------+4 rows in set (0.01 sec)
xxxxxxxxxxpackage com.zenitera.bigdata.doris;
import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Flink_SQL_doris_read { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("create table flink_0520(" + " siteid int, " + " citycode SMALLINT, " + " username string, " + " pv bigint " + ")with(" + " 'connector' = 'doris', " + " 'fenodes' = 'hdt-dmcp-ops01:8130', " + " 'table.identifier' = 'test_db.flinktest', " + " 'username' = 'root', " + " 'password' = '123456' " + ")");
tEnv.sqlQuery("select * from flink_0520").execute().print();
}}
/* 控制台输出信息:+----+-------------+----------+---------------+---------+| op | siteid | citycode | username | pv |+----+-------------+----------+---------------+---------+| +I | 1 | 1 | aaa | 1 || +I | 3 | 3 | ccc | 3 || +I | 2 | 2 | bbb | 2 || +I | 4 | 4 | wangting | 4 |+----+-------------+----------+---------------+---------+4 rows in set*/