概述
gobblin 0.10
想要持久化kafka到hdfs有很多种方式,比如flume、logstash、gobblin,其中flume和logstash是流式的,gobblin是批处理式的,gobblin通过定时任务触发来完成数据持久化,在任务和任务之间是没有任何读写的,这点是和flume、logstash的最大不同;
gobblin有几种部署方式:
1)standalone+cron;
2)mr+oozie/azkaban等
3)docker;
其中第3中方式最为方便,因为gobblin可以把任务的状态都写到hdfs上,所以在哪个节点启动gobblin并没有什么区别,而且只有数据同步之后才会修改元数据,保证不会因为kafka或者hdfs或者自身故障导致丢数据;
1 配置
#job job.name=test_job job.group=test_group job.schedule=0 0 */1 * * ? job.lock.enabled=false #source source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource extract.namespace=gobblin.extract.kafka kafka.brokers=$kafka_brokers bootstrap.with.offset=latest topic.whitelist=$kafka_topics mr.job.max.mappers=1 #writer writer.builder.class=gobblin.writer.SimpleDataWriterBuilder writer.file.path.type=tablename writer.destination.type=HDFS writer.output.format=txt writer.partitioner.class=gobblin.writer.partitioner.TimeBasedWriterPartitioner
writer.partition.columns=time writer.partition.level=hourly writer.partition.pattern=yyyyMMdd/HH writer.partition.timezone=Asia/Shanghai data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher #metrics metrics.reporting.file.enabled=true metrics.reporting.file.suffix=txt #fs fs.uri=hdfs://$name_node:8020 writer.fs.uri=${fs.uri} state.store.fs.uri=${fs.uri} data.publisher.final.dir=${env:GOBBLIN_WORK_DIR}/job-output metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics state.store.dir=${env:GOBBLIN_WORK_DIR}/state-store mr.job.root.dir=${env:GOBBLIN_WORK_DIR}/working task.data.root.dir=${env:GOBBLIN_WORK_DIR}/task-data
修改其中的$kafka_brokers,$kafka_topics,$name_node即可;
这里的配置为standalone每小时执行一次,每次执行时根据数据中的time字段来格式化为时间分区进行存放到hdfs上的指定目录;
2 启动
export GOBBLIN_JOB_CONFIG_DIR=/opt/gobblin/gobblin-dist/job_conf export GOBBLIN_WORK_DIR=/opt/gobblin/gobblin-dist/work_dir bin/gobblin-standalone.sh start
3 定制化
1)希望按照当前时间(而不是数据中的时间)进行时间分区
package gobblin.writer.partitioner; import gobblin.configuration.State; public class DefaultTimeBasedWriterPartitioner extends TimeBasedWriterPartitioner { public DefaultTimeBasedWriterPartitioner(State state, int numBranches, int branchId) { super(state, numBranches, branchId); } public long getRecordTimestamp(Object record) { return System.currentTimeMillis(); } }
配置:
writer.partitioner.class=gobblin.writer.partitioner.DefaultTimeBasedWriterPartitioner
2)只保存json数据,并且添加换行
package gobblin.source.extractor.extract.kafka; import gobblin.configuration.WorkUnitState; import gobblin.source.extractor.Extractor; import java.io.IOException; public class JsonKafkaSimpleSource extends KafkaSimpleSource { public JsonKafkaSimpleSource() {} @Override public Extractor<String, byte[]> getExtractor(WorkUnitState state) throws IOException { return new JsonKafkaSimpleExtractor(state); } }
package gobblin.source.extractor.extract.kafka; import gobblin.configuration.WorkUnitState; import gobblin.kafka.client.ByteArrayBasedKafkaRecord; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; public class JsonKafkaSimpleExtractor extends KafkaSimpleExtractor { public JsonKafkaSimpleExtractor(WorkUnitState state) { super(state); } @Override protected byte[] decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException { byte[] resultBytes = kafkaConsumerRecord.getMessageBytes(); String result = new String(resultBytes, "UTF-8"); if (result != null && result.length() > 2 && result.charAt(0) == '{' && result.charAt(result.length() - 1) == '}') return (result + "n").getBytes("UTF-8"); else { System.out.println("[" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "]found invalid json : " + result); return "".getBytes(); } } }
配置:
source.class=gobblin.source.extractor.extract.kafka.JsonKafkaSimpleSource
4 docker image
https://hub.docker.com/r/gobblin/gobblin-standalone
docker run -d gobblin/gobblin-standalone:ubuntu-gobblin-0.10.0
参考:
https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/
https://gobblin.readthedocs.io/en/latest/user-guide/Configuration-Properties-Glossary/
转载于:https://www.cnblogs.com/barneywill/p/10959519.html
最后
以上就是坦率季节为你收集整理的【原创】大数据基础之Gobblin(2)持久化kafka到hdfs1 配置2 启动3 定制化4 docker image的全部内容,希望文章能够帮你解决【原创】大数据基础之Gobblin(2)持久化kafka到hdfs1 配置2 启动3 定制化4 docker image所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复