Custom Parser SDK

Kylin supports custom parsing of Kafka data through the SDK.

Setting up the development environment

Create a Project

  • Use Maven to manage project dependencies

Modify the pom.xml file



Import SDK

Copy ${KYLIN_HOME}/server/jars/kylin-streaming-sdk-{version}.jar.

Create a new lib directory in the project root directory. And put the SDK Jar into the lib directory.

Load the SDK dependencies into the project



Implement a custom parser

  • Create a new parser class, XXXParser extends AbstractDataParser<ByteBuffer>.
  • Override parse(ByteBuffer input), Parse a single piece of data within a method, return Map<Field Name, Field Value>.
  • If there are initialization actions that need to be done while the instance resolver class is in use, do so in a parameterless construct.
  • Override before() if an initialization action is required before each data is parsed.
  • If an exception is thrown during the parsing of a single piece of data, the data will be considered dirty during the build and the construction of this piece of data will be skipped.
  • To check data after each piece of data is processed, Override after().
  • Create ${project.basedir}/src/resources/META-INF/services/org.apache.kylin.parser.AbstractDataParser file, and each of the parser class's class path to fill them.

Demo Parser


Parse JSON

Input sample data

"name": "Li",
"sex": "man",
"age": 24,
"addr": {
"country": "China",
"city": "Shanghai",
"region": "YangPu"
"works": [
"create_time": "2022-11-01 08:00:00",
"update_time": "2022-11-20 12:00:00"

Output parsed data

"name": "Li",
"sex": "man",
"age": 24,
"addr_country": "China",
"addr_city": "Shanghai",
"addr_region": "YangPu",
"first_works": "work_1",
"create_time": "2022-11-01 08:00:00",
"update_time": "2022-11-20 12:00:00",
"process_time": "2022-11-20 13:00:00"

The parser code

package org.apache.kylin.parser.json;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.kylin.parser.AbstractDataParser;
import org.apache.kylin.parser.utils.ParserBenchMark;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class JsonCustomParser extends AbstractDataParser<ByteBuffer> {

private static final String JSON_INPUT_STR = "{\"name\": \"Li\",\"sex\": \"man\",\"age\": 24,\"addr\": {\"country\": \"China\",\"city\": \"Shanghai\",\"region\": \"YangPu\"},\"works\": [\"work_1\",\"work_2\",\"work_3\"],\"create_time\": \"2022-11-01 08:00:00\",\"update_time\": \"2022-11-20 12:00:00\"}";
private static final ObjectMapper MAPPER = new ObjectMapper();

protected Map<String, Object> parse(ByteBuffer buffer) {
JsonInputEntry inputEntry = MAPPER.readValue(buffer.array(), JsonInputEntry.class);
JsonOutputEntry outputEntry = JsonOutputEntry.transform(inputEntry);
return MAPPER.convertValue(outputEntry, Map.class);

public static void main(String[] args) {
// get parser
AbstractDataParser<ByteBuffer> dataParser = AbstractDataParser.getDataParser(JsonCustomParser.class.getName(),
// parse
ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(JSON_INPUT_STR);
Map<String, Object> resultMap = dataParser.process(byteBuffer);
// parser BenchMark
System.out.printf("parser 20k data, cost: %s ms \n", ParserBenchMark.test20K(byteBuffer, dataParser));
System.out.printf("parser 40k data, cost: %s ms \n", ParserBenchMark.test40K(byteBuffer, dataParser));
System.out.printf("parser 60k data, cost: %s ms \n", ParserBenchMark.test60K(byteBuffer, dataParser));
System.out.printf("parser 999999 data, cost: %s ms \n", ParserBenchMark.testWithSize(byteBuffer, dataParser, 999999));

public static class JsonInputEntry {
private String name;
private String sex;
private int age;
private Addr addr;
private final List<String> works = Lists.newArrayList();
private String createTime;
private String updateTime;

public static class Addr {
private String country;
private String city;
private String region;

public static class JsonOutputEntry {
private String name;
private String sex;
private int age;
private String addrCountry;
private String addrCity;
private String addrRegion;
private String firstWork;
private String createTime;
private String updateTime;
private String processTime;

public static JsonOutputEntry transform(JsonInputEntry inputEntry) {
JsonOutputEntry outputEntry = new JsonOutputEntry();
outputEntry.setFirstWork(inputEntry.getWorks().isEmpty() ? null : inputEntry.getWorks().get(0));
outputEntry.setProcessTime(DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
return outputEntry;

Parse CSV

This case use '|' as CSV separator

Input sample data

1|Li|"deve|loper"|Table tennis|2022-11-01 08:00:00|2022-11-02 08:00:00

Output parsed data

"id" : 1,
"name" : "Li",
"job" : "Table tennis",
"sport" : "deve|loper",
"create_time" : "2022-11-01 08:00:00",
"delete_time" : "2022-11-02 08:00:00",
"process_time" : "2022-11-21 16:28:05"

The parser code

package org.apache.kylin.parser.csv;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.kylin.parser.AbstractDataParser;
import org.apache.kylin.parser.utils.ParserBenchMark;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class CsvCustomParser extends AbstractDataParser<ByteBuffer> {

private static final String CSV_INPUT_STR = "1|Li|\"deve|loper\"|Table tennis|2022-11-01 08:00:00|2022-11-02 08:00:00";
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final CSVParser csvParser = new CSVParserBuilder()

protected Map<String, Object> parse(ByteBuffer buffer) {
try (StringReader reader = new StringReader(StandardCharsets.UTF_8.decode(buffer).toString());
CSVReader csvReader = new CSVReaderBuilder(reader).withCSVParser(csvParser).build()) {
List<String> line = Lists.newArrayList(csvReader.readNext());
if (line.isEmpty()) {
return Maps.newHashMap();
CsvOutputEntry outputEntry = CsvOutputEntry.transform(line);
return MAPPER.convertValue(outputEntry, Map.class);
} catch (IOException e) {
throw new RuntimeException(e);

public static void main(String[] args) {
// get parser
AbstractDataParser<Object> dataParser = AbstractDataParser.getDataParser(CsvCustomParser.class.getName(),
// parse
ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(CSV_INPUT_STR);
Map<String, Object> resultMap = dataParser.process(byteBuffer);
// parser BenchMark
System.out.printf("parser 20k data, cost: %s ms \n", ParserBenchMark.test20K(byteBuffer, dataParser));
System.out.printf("parser 40k data, cost: %s ms \n", ParserBenchMark.test40K(byteBuffer, dataParser));
System.out.printf("parser 60k data, cost: %s ms \n", ParserBenchMark.test60K(byteBuffer, dataParser));
System.out.printf("parser 999999 data, cost: %s ms \n", ParserBenchMark.testWithSize(byteBuffer, dataParser, 999999));

public static class CsvOutputEntry {
private long id;
private String name;
private String job;
private String sport;
private String createTime;
private String deleteTime;
private String processTime;

public static CsvOutputEntry transform(List<String> line) {
CsvOutputEntry outputEntry = new CsvOutputEntry();
outputEntry.setProcessTime(DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
return outputEntry;

Project Package

mvn clean package -DskipTests

Generate ${project.basedir}/target/custom-parser-demo.jar

Upload a custom parser to the system

Create Kafka tables using a custom parser
