Skip to main content
Version: 2.4.0

Apache Flink

Introduction

This document describes how to use Kylin as a data source in Apache Flink;

There were several attempts to do this in Scala and JDBC, but none of them works:

We will try use CreateInput and JDBCInputFormat in batch mode and access via JDBC to Kylin. But it isn’t implemented in Scala, is only in Java MailList. This doc will go step by step solving these problems.

Pre-requisites

Used software:

Starting point:

This can be out initial skeleton:

import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.kylin.jdbc.Driver")
.setDBUrl("jdbc:kylin://172.17.0.2:7070/learn_kylin")
.setUsername("ADMIN")
.setPassword("KYLIN")
.setQuery("select count(distinct seller_id) as sellers from kylin_sales group by part_dt order by part_dt")
.finish()
val dataset =env.createInput(inputFormat)

The first error is: alt text

Add to Scala:

import org.apache.flink.api.java.io.jdbc.JDBCInputFormat

Next error is alt text

We can solve dependencies (mvn repository: jdbc); Add this to your pom.xml:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>

Solve dependencies of row

Similar to previous point we need solve dependencies of Row Class (mvn repository: Table) :

  • In pom.xml add:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
  • In Scala:
import org.apache.flink.api.table.Row

Solve RowTypeInfo property (and their new dependencies)

This is the new error to solve:

val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.kylin.jdbc.Driver")
.setDBUrl("jdbc:kylin://172.17.0.2:7070/learn_kylin")
.setUsername("ADMIN")
.setPassword("KYLIN")
.setQuery("select count(distinct seller_id) as sellers from kylin_sales group by part_dt order by part_dt")
.setRowTypeInfo(DB_ROWTYPE)
.finish()
  • How can configure this property in Scala? In Attempt4, there is an incorrect solution

    We can check the types using the intellisense: alt text

    Then we will need add more dependences; Add to scala:

import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}

Create a Array or Seq of TypeInformation[ ]

Solution:

   var stringColum: TypeInformation[String] = createTypeInformation[String]
val DB_ROWTYPE = new RowTypeInfo(Seq(stringColum))

Solve ClassNotFoundException

Need find the kylin-jdbc-x.x.x.jar and then expose to Flink

  1. Find the Kylin JDBC jar

    From Kylin Download choose Binary and the correct version of Kylin and HBase

    Download & Unpack: in ./lib:

  1. Make this JAR accessible to Flink

    If you execute like service you need put this JAR in you Java class path using your .bashrc

Check the actual value: alt text

Check the permission for this file (Must be accessible for you):

If you are executing from IDE, need add your class path manually:

On IntelliJ: alt text > alt text > alt text > alt text

The result, will be similar to: alt text

Solve "Couldn’t access resultSet" error

It is related with Flink 4108 (MailList) and Timo Walther make a PR

If you are running Flink <= 1.2 you will need apply this path and make clean install

Solve the casting error

In the error message you have the problem and solution …. nice ;) ¡¡

The result

The output must be similar to this, print the result of query by standard output:

Now, more complex

Try with a multi-colum and multi-type query:

select part_dt, sum(price) as total_selled, count(distinct seller_id) as sellers 
from kylin_sales
group by part_dt
order by part_dt

Need changes in DB_ROWTYPE:

And import lib of Java, to work with Data type of Java alt text

The new result will be:

Error: Reused Connection

Check if your HBase and Kylin is working. Also you can use Kylin UI for it.

Error: java.lang.AbstractMethodError: ….Avatica Connection

See Kylin 1898

It is a problem with kylin-jdbc-1.x.x. JAR, you need use Calcite 1.8 or above; The solution is to use Kylin 1.5.4 or above.

Error: can't expand macros compiled by previous versions of scala

Is a problem with versions of scala, check in with "scala -version" your actual version and choose your correct POM.

Perhaps you will need a IntelliJ > File > Invalidates Cache > Invalidate and Restart.

I added POM for Scala 2.11

Final Words

Now you can read Kylin’s data from Apache Flink, great!

Full Code Example

Solved all integration problems, and tested with different types of data (Long, BigDecimal and Dates). The patch has been comited at 15 Oct, then, will be part of Flink 1.2.