博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的Table API及SQL Programs
阅读量:6673 次
发布时间:2019-06-25

本文共 10218 字,大约阅读时间需要 34 分钟。

  hot3.png

本文主要研究一下flink的Table API及SQL Programs

实例

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// create a TableEnvironment// for batch programs use BatchTableEnvironment instead of StreamTableEnvironmentStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// register a TabletableEnv.registerTable("table1", ...)            // ortableEnv.registerTableSource("table2", ...);     // ortableEnv.registerExternalCatalog("extCat", ...);// register an output TabletableEnv.registerTableSink("outputTable", ...);// create a Table from a Table API queryTable tapiResult = tableEnv.scan("table1").select(...);// create a Table from a SQL queryTable sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");// emit a Table API result Table to a TableSink, same for SQL resulttapiResult.insertInto("outputTable");// executeenv.execute();
  • 本实例展示了flink的Table API及SQL Programs的基本用法

Table API实例

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// register Orders table// scan registered Orders tableTable orders = tableEnv.scan("Orders");// compute revenue for all customers from FranceTable revenue = orders  .filter("cCountry === 'FRANCE'")  .groupBy("cID, cName")  .select("cID, cName, revenue.sum AS revSum");// emit or convert Table// execute query
  • 通过tableEnv.scan方法来创建Table,之后使用Table的各种查询api

sqlQuery实例

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// register Orders table// compute revenue for all customers from FranceTable revenue = tableEnv.sqlQuery(    "SELECT cID, cName, SUM(revenue) AS revSum " +    "FROM Orders " +    "WHERE cCountry = 'FRANCE' " +    "GROUP BY cID, cName"  );// emit or convert Table// execute query
  • sqlQuery内部是使用Apache Calcite来实现的

sqlUpdate实例(TableSink)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// register "Orders" table// register "RevenueFrance" output table// compute revenue for all customers from France and emit to "RevenueFrance"tableEnv.sqlUpdate(    "INSERT INTO RevenueFrance " +    "SELECT cID, cName, SUM(revenue) AS revSum " +    "FROM Orders " +    "WHERE cCountry = 'FRANCE' " +    "GROUP BY cID, cName"  );// execute query
  • 这里使用TableSink注册output table之后,就可以使用TableEnvironment的sqlUpdate方法sink到output table

insertInto实例(TableSink)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// create a TableSinkTableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");// register the TableSink with a specific schemaString[] fieldNames = {"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);// compute a result Table using Table API operators and/or SQL queriesTable result = ...// emit the result Table to the registered TableSinkresult.insertInto("CsvSinkTable");// execute the program
  • 通过Table.insertInto方法sink到output table

DataStream(或DataSet)与Table转换

注册DataStream为Table

// get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalentStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);DataStream
> stream = ...// register the DataStream as Table "myTable" with fields "f0", "f1"tableEnv.registerDataStream("myTable", stream);// register the DataStream as table "myTable2" with fields "myLong", "myString"tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
  • 通过StreamTableEnvironment.registerDataStream注册DataStream为Table

DataStream转Table实例

// get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalentStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);DataStream
> stream = ...// Convert the DataStream into a Table with default fields "f0", "f1"Table table1 = tableEnv.fromDataStream(stream);// Convert the DataStream into a Table with fields "myLong", "myString"Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
  • 这里通过StreamTableEnvironment.fromDataStream将DataStream转为Table

Table转DataStream实例

// get StreamTableEnvironment. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// Table with two fields (String name, Integer age)Table table = ...// convert the Table into an append DataStream of Row by specifying the classDataStream
dsRow = tableEnv.toAppendStream(table, Row.class);// convert the Table into an append DataStream of Tuple2
// via a TypeInformationTupleTypeInfo
> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT());DataStream
> dsTuple = tableEnv.toAppendStream(table, tupleType);// convert the Table into a retract DataStream of Row.// A retract stream of type X is a DataStream
>. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE.DataStream
> retractStream = tableEnv.toRetractStream(table, Row.class);
  • 这里通过StreamTableEnvironment.toRetractStream将Table转换为DataStream

Table转DataSet实例

// get BatchTableEnvironmentBatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// Table with two fields (String name, Integer age)Table table = ...// convert the Table into a DataSet of Row by specifying a classDataSet
dsRow = tableEnv.toDataSet(table, Row.class);// convert the Table into a DataSet of Tuple2
via a TypeInformationTupleTypeInfo
> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT());DataSet
> dsTuple = tableEnv.toDataSet(table, tupleType);
  • 这里通过BatchTableEnvironment.toDataSet将Table转换为DataSet

Data Types与Table Schema映射

Position-based Mapping(Tuple类型)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);//---Tuple类型---DataStream
> stream = ...// convert DataStream into Table with default field names "f0" and "f1"Table table = tableEnv.fromDataStream(stream);// convert DataStream into Table with field names "myLong" and "myInt"Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
  • Position-based的映射要求新指定的字段名不能与input data type重名,如果没有指定,则默认从f0开始来命名原始类型;此模式适用于Tuple、Row类型,POJO类型不能使用此模式

Name-based Mapping(POJO类型)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);//---Tuple类型---DataStream
> stream = ...// convert DataStream into Table with default field names "f0" and "f1"Table table = tableEnv.fromDataStream(stream);// convert DataStream into Table with field "f1" onlyTable table = tableEnv.fromDataStream(stream, "f1");// convert DataStream into Table with swapped fieldsTable table = tableEnv.fromDataStream(stream, "f1, f0");// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");//---POJO类型---// Person is a POJO with fields "name" and "age"DataStream
stream = ...// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)Table table = tableEnv.fromDataStream(stream);// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");// convert DataStream into Table with projected field "name" (name-based)Table table = tableEnv.fromDataStream(stream, "name");// convert DataStream into Table with projected and renamed field "myName" (name-based)Table table = tableEnv.fromDataStream(stream, "name as myName");
  • Tuple或者POJO类型都可以使用这种模式,也可以使用as进行别名

Atomic类型

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);DataStream
stream = ...// convert DataStream into Table with default field name "f0"Table table = tableEnv.fromDataStream(stream);// convert DataStream into Table with field name "myLong"Table table = tableEnv.fromDataStream(stream, "myLong");
  • 原始类型被转换为单个字段

Row类型

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`DataStream
stream = ...// convert DataStream into Table with default field names "name", "age"Table table = tableEnv.fromDataStream(stream);// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)Table table = tableEnv.fromDataStream(stream, "myName, myAge");// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");// convert DataStream into Table with projected field "name" (name-based)Table table = tableEnv.fromDataStream(stream, "name");// convert DataStream into Table with projected and renamed field "myName" (name-based)Table table = tableEnv.fromDataStream(stream, "name as myName");
  • Row类型支持任意数量的字段,并允许字段值为null,它可以使用Position-based Mapping及Name-based Mapping

小结

flink的Table API及SQL Programs的基本用法

  • 首先是创建TableEnvironment(BatchTableEnvironment或者StreamTableEnvironment),之后就是创建Table或者TableSource并注册到catalog(默认使用的catalog是internal的,也可以自己选择注册external catalog),然后就进行table的query,之后就是一些转换操作
  • 关于Table的创建可以从DataSet、DataStream转换过来;关于Table的查询可以使用api query(scan方法),也可以使用sql query(sqlQuery方法),或者是混合使用
  • 也可以将查询的Table转换为DataSet或者DataStream进行其他处理;如果输出也是输出到table的话,可以注册TableSink,然后使用TableEnvironment的sqlUpdate方法或Table的insertInto方法输出到TableSink

doc

转载于:https://my.oschina.net/go4it/blog/3003986

你可能感兴趣的文章
《数据科学:R语言实现》——第2章 数据抽取、转换和加载
查看>>
《深入理解Spark:核心思想与源码分析》——3.7节创建和启动DAGScheduler
查看>>
《ANSYS Workbench有限元分析实例详解(静力学)》——2.5 Windows界面相应操作
查看>>
《R与Hadoop大数据分析实战》一1.7 Hadoop的子项目
查看>>
Google Web Designer 开始支持 Linux
查看>>
《电路分析导论(原书第12版)》一第3章 电阻
查看>>
设计师应该学习业务而非编写代码
查看>>
《代码整洁之道:程序员的职业素养》一一1.3 首先,不行损害之事
查看>>
《音乐达人秀:Adobe Audition实战200例》——1.2 从双卡录音机到多轨录音软件
查看>>
《运营力——微信公众号 设计 策划 客服 管理 一册通》导读
查看>>
Unreal Engine 4.5 发布, 超 40 项功能改进
查看>>
《Rhino3D 4.0产品造型设计学习手册》——1.2节Rhino 3D的特征
查看>>
新版 Win 10 针对中国市场,改善简体中文输入体验
查看>>
《众妙之门——自由网站设计师成功之道》一1.7 像专业人士一样出击
查看>>
百度 360 诉讼案宣判 360 败诉
查看>>
《SolidWorks 2016中文版机械设计从入门到精通》——2.3 草图编辑
查看>>
《OpenGL编程指南》一1.5 第一个程序:深入分析
查看>>
Reddit 事实核查新方法,人为劝导和智能算法结合
查看>>
Chrome 57 Beta 新特性 改进了 Add to Home Screen
查看>>
Java 学习线路图是怎样的?
查看>>