Custom Parser SDK
Kylin supports custom parsing of Kafka data through the SDK.
Setting up the development environment
Create a Project
- Use Maven to manage project dependencies

Modify the pom.xml file
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.10.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.3.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <finalName>${artifactId}</finalName>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.kylin:kylin-streaming-sdk</exclude>
                                <exclude>org.projectlombok:*</exclude>
                                <exclude>org.apache.commons:*</exclude>
                                <exclude>com.fasterxml.jackson.core:*</exclude>
                                <exclude>com.google.guava:*</exclude>
                                <exclude>org.slf4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                    <exclude>javax/annotation/**</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
Import SDK
Copy ${KYLIN_HOME}/server/jars/kylin-streaming-sdk-{version}.jar.
Create a new lib directory in the project root directory. And put the SDK Jar into the lib directory.

Load the SDK dependencies into the project
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <noop.version>1</noop.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.kylin</groupId>
        <artifactId>kylin-streaming-sdk</artifactId>
        <version>${noop.version}</version>
        <scope>system</scope>
        <systemPath>${project.basedir}/lib/kylin-streaming-sdk-{version}.jar</systemPath>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.10</version>
    </dependency>
</dependencies>
Implement a custom parser
- Create a new parser class, XXXParser extends AbstractDataParser<ByteBuffer>.
- Override parse(ByteBuffer input), Parse a single piece of data within a method, return- Map<Field Name, Field Value>.
- If there are initialization actions that need to be done while the instance resolver class is in use, do so in a parameterless construct.
- Override before()if an initialization action is required before each data is parsed.
- If an exception is thrown during the parsing of a single piece of data, the data will be considered dirty during the build and the construction of this piece of data will be skipped.
- To check data after each piece of data is processed, Override after().
- Create ${project.basedir}/src/resources/META-INF/services/org.apache.kylin.parser.AbstractDataParserfile, and each of the parser class's class path to fill them.

Demo Parser
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.apache.kylin</groupId>
    <artifactId>custom-parser-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <noop.version>1</noop.version>
    </properties>
    <dependencies>
        <!-- SDK -->
        <dependency>
            <groupId>org.apache.kylin</groupId>
            <artifactId>kylin-streaming-sdk</artifactId>
            <version>${noop.version}</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/lib/kylin-streaming-sdk-5.0.0-SNAPSHOT.jar</systemPath>
        </dependency>
        <!-- Json -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.14.0</version>
        </dependency>
        <!-- CSV -->
        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>5.7.1</version>
            <exclusions>
                <exclusion>
                    <groupId>commons-collections</groupId>
                    <artifactId>commons-collections</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- Others -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>31.1-jre</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.18.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.10</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.10.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <finalName>${artifactId}</finalName>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.kylin:kylin-streaming-sdk</exclude>
                                    <exclude>org.projectlombok:*</exclude>
                                    <exclude>org.apache.commons:*</exclude>
                                    <exclude>com.fasterxml.jackson.core:*</exclude>
                                    <exclude>com.google.guava:*</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>javax/annotation/**</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
Parse JSON
Input sample data
{
  "name": "Li",
  "sex": "man",
  "age": 24,
  "addr": {
    "country": "China",
    "city": "Shanghai",
    "region": "YangPu"
  },
  "works": [
    "work_1",
    "work_2",
    "work_3"
  ],
  "create_time": "2022-11-01 08:00:00",
  "update_time": "2022-11-20 12:00:00"
}
Output parsed data
{
  "name": "Li",
  "sex": "man",
  "age": 24,
  "addr_country": "China",
  "addr_city": "Shanghai",
  "addr_region": "YangPu",
  "first_works": "work_1",
  "create_time": "2022-11-01 08:00:00",
  "update_time": "2022-11-20 12:00:00",
  "process_time": "2022-11-20 13:00:00"
}
The parser code
package org.apache.kylin.parser.json;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.kylin.parser.AbstractDataParser;
import org.apache.kylin.parser.utils.ParserBenchMark;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class JsonCustomParser extends AbstractDataParser<ByteBuffer> {
    private static final String JSON_INPUT_STR = "{\"name\": \"Li\",\"sex\": \"man\",\"age\": 24,\"addr\": {\"country\": \"China\",\"city\": \"Shanghai\",\"region\": \"YangPu\"},\"works\": [\"work_1\",\"work_2\",\"work_3\"],\"create_time\": \"2022-11-01 08:00:00\",\"update_time\": \"2022-11-20 12:00:00\"}";
    private static final ObjectMapper MAPPER = new ObjectMapper();
    @SneakyThrows
    @Override
    protected Map<String, Object> parse(ByteBuffer buffer) {
        JsonInputEntry inputEntry = MAPPER.readValue(buffer.array(), JsonInputEntry.class);
        JsonOutputEntry outputEntry = JsonOutputEntry.transform(inputEntry);
        return MAPPER.convertValue(outputEntry, Map.class);
    }
    @SneakyThrows
    public static void main(String[] args) {
        // get parser
        AbstractDataParser<ByteBuffer> dataParser = AbstractDataParser.getDataParser(JsonCustomParser.class.getName(),
                Thread.currentThread().getContextClassLoader());
        // parse
        ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(JSON_INPUT_STR);
        Map<String, Object> resultMap = dataParser.process(byteBuffer);
        System.out.println(MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(resultMap));
        // parser BenchMark
        System.out.printf("parser 20k data, cost: %s ms \n", ParserBenchMark.test20K(byteBuffer, dataParser));
        System.out.printf("parser 40k data, cost: %s ms \n", ParserBenchMark.test40K(byteBuffer, dataParser));
        System.out.printf("parser 60k data, cost: %s ms \n", ParserBenchMark.test60K(byteBuffer, dataParser));
        System.out.printf("parser 999999 data, cost: %s ms \n", ParserBenchMark.testWithSize(byteBuffer, dataParser, 999999));
    }
    @Data
    public static class JsonInputEntry {
        private String name;
        private String sex;
        private int age;
        private Addr addr;
        @Getter
        private final List<String> works = Lists.newArrayList();
        @JsonProperty("create_time")
        private String createTime;
        @JsonProperty("update_time")
        private String updateTime;
    }
    @Data
    public static class Addr {
        private String country;
        private String city;
        private String region;
    }
    @Data
    public static class JsonOutputEntry {
        private String name;
        private String sex;
        private int age;
        @JsonProperty("addr_country")
        private String addrCountry;
        @JsonProperty("addr_city")
        private String addrCity;
        @JsonProperty("addr_region")
        private String addrRegion;
        @JsonProperty("first_work")
        private String firstWork;
        @JsonProperty("create_time")
        private String createTime;
        @JsonProperty("update_time")
        private String updateTime;
        @JsonProperty("process_time")
        private String processTime;
        public static JsonOutputEntry transform(JsonInputEntry inputEntry) {
            JsonOutputEntry outputEntry = new JsonOutputEntry();
            outputEntry.setName(inputEntry.getName());
            outputEntry.setSex(inputEntry.getSex());
            outputEntry.setAge(inputEntry.getAge());
            outputEntry.setAddrCountry(inputEntry.getAddr().getCountry());
            outputEntry.setAddrCity(inputEntry.getAddr().getCity());
            outputEntry.setAddrRegion(inputEntry.getAddr().getRegion());
            outputEntry.setFirstWork(inputEntry.getWorks().isEmpty() ? null : inputEntry.getWorks().get(0));
            outputEntry.setCreateTime(inputEntry.getCreateTime());
            outputEntry.setUpdateTime(inputEntry.getUpdateTime());
            outputEntry.setProcessTime(DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
            return outputEntry;
        }
    }
}
Parse CSV
This case use '|' as CSV separator
Input sample data
1|Li|"deve|loper"|Table tennis|2022-11-01 08:00:00|2022-11-02 08:00:00
Output parsed data
{
  "id" : 1,
  "name" : "Li",
  "job" : "Table tennis",
  "sport" : "deve|loper",
  "create_time" : "2022-11-01 08:00:00",
  "delete_time" : "2022-11-02 08:00:00",
  "process_time" : "2022-11-21 16:28:05"
}
The parser code
package org.apache.kylin.parser.csv;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.kylin.parser.AbstractDataParser;
import org.apache.kylin.parser.utils.ParserBenchMark;
import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class CsvCustomParser extends AbstractDataParser<ByteBuffer> {
    private static final String CSV_INPUT_STR = "1|Li|\"deve|loper\"|Table tennis|2022-11-01 08:00:00|2022-11-02 08:00:00";
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static final CSVParser csvParser = new CSVParserBuilder()
            .withSeparator('|')
            .withQuoteChar('"')
            .withEscapeChar('\\')
            .build();
    @SneakyThrows
    @Override
    protected Map<String, Object> parse(ByteBuffer buffer) {
        try (StringReader reader = new StringReader(StandardCharsets.UTF_8.decode(buffer).toString());
             CSVReader csvReader = new CSVReaderBuilder(reader).withCSVParser(csvParser).build()) {
            List<String> line = Lists.newArrayList(csvReader.readNext());
            if (line.isEmpty()) {
                return Maps.newHashMap();
            }
            CsvOutputEntry outputEntry = CsvOutputEntry.transform(line);
            return MAPPER.convertValue(outputEntry, Map.class);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    @SneakyThrows
    public static void main(String[] args) {
        // get parser
        AbstractDataParser<Object> dataParser = AbstractDataParser.getDataParser(CsvCustomParser.class.getName(),
                Thread.currentThread().getContextClassLoader());
        // parse
        ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(CSV_INPUT_STR);
        Map<String, Object> resultMap = dataParser.process(byteBuffer);
        System.out.println(MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(resultMap));
        // parser BenchMark
        System.out.printf("parser 20k data, cost: %s ms \n", ParserBenchMark.test20K(byteBuffer, dataParser));
        System.out.printf("parser 40k data, cost: %s ms \n", ParserBenchMark.test40K(byteBuffer, dataParser));
        System.out.printf("parser 60k data, cost: %s ms \n", ParserBenchMark.test60K(byteBuffer, dataParser));
        System.out.printf("parser 999999 data, cost: %s ms \n", ParserBenchMark.testWithSize(byteBuffer, dataParser, 999999));
    }
    @Data
    public static class CsvOutputEntry {
        private long id;
        private String name;
        private String job;
        private String sport;
        @JsonProperty("create_time")
        private String createTime;
        @JsonProperty("delete_time")
        private String deleteTime;
        @JsonProperty("process_time")
        private String processTime;
        public static CsvOutputEntry transform(List<String> line) {
            CsvOutputEntry outputEntry = new CsvOutputEntry();
            outputEntry.setId(Long.parseLong(line.get(0)));
            outputEntry.setName(line.get(1));
            outputEntry.setSport(line.get(2));
            outputEntry.setJob(line.get(3));
            outputEntry.setCreateTime(line.get(4));
            outputEntry.setDeleteTime(line.get(5));
            outputEntry.setProcessTime(DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
            return outputEntry;
        }
    }
}
Project Package
mvn clean package -DskipTests
Generate ${project.basedir}/target/custom-parser-demo.jar

Upload a custom parser to the system
For more details, please refer to Custom Parser Jar Package Management API。
Create Kafka tables using a custom parser
Json



CSV


