Skip to main content
 Web开发网 » 操作系统 » linux系统

如何拓展Hadoop的InputFormat为其他分隔符

2021年10月14日6050百度已收录

  在Hadoop中,常用的TextInputFormat是以换行符作为Record分隔符的。

在实际应用中,我们经常会出现一条Record中包含多行的情况,例如:

此时,需要拓展TextInputFormat以完成这个功能。

先来看一下原始实现:

Java

public class TextInputFormat extends FileInputFormat {

@Override

public RecordReader

createRecordReader(InputSplit split,

TaskAttemptContext context) {

// By default,textinputformat。

  record。delimiter = ‘/n’(Set in configuration file)

String delimiter = context。getConfiguration()。get(

"textinputformat。

  record。delimiter");

byte[] recordDelimiterBytes = null;

if (null != delimiter)

recordDelimiterBytes = delimiter。

  getBytes();

return new LineRecordReader(recordDelimiterBytes);

@Override

protected boolean isSplitable(JobContext context, Path file) {

CompressionCodec codec =

new CompressionCodecFactory(context。

  getConfiguration())。getCodec(file);

return codec == null;

public class TextInputFormat extends FileInputFormat {

@Override

public RecordReader

createRecordReader ( InputSplit split ,

TaskAttemptContext context ) {

// By default,textinputformat。

  record。delimiter = ‘/n’(Set in configuration file)

String delimiter = context 。 getConfiguration ( ) 。 get (

"textinputformat。

  record。delimiter" ) ;

byte [ ] recordDelimiterBytes = null ;

if ( null != delimiter )

recordDelimiterBytes = delimiter 。

   getBytes ( ) ;

return new LineRecordReader ( recordDelimiterBytes ) ;

@Override

protected boolean isSplitable ( JobContext context , Path file ) {

CompressionCodec codec =

new CompressionCodecFactory ( context 。

   getConfiguration ( ) ) 。getCodec ( file ) ;

return codec == null ;

根据上面的代码, 不难发现,换行符实际上是由”textinputformat。

  record。delimiter”这个配置决定的。

所以我们有种解决方案:

(1) 在Job中直接配置textinputformat。record。delimiter为”

”,这种方案是比较Hack的,很容易影响到其他代码的正常执行。

(2) 继承TextInputFormat,在return LineRecordReader时,使用自定义的分隔符。

本文采用第二种方案,代码如下:

Java

public class DocInputFormat extends TextInputFormat {

private static final String RECORD_DELIMITER = "

@Override

public RecordReader createRecordReader(

InputSplit split, TaskAttemptContext tac) {

byte[] recordDelimiterBytes = null;

recordDelimiterBytes = RECORD_DELIMITER。

  getBytes();

return new LineRecordReader(recordDelimiterBytes);

@Override

public boolean isSplitable(JobContext context, Path file) {

CompressionCodec codec = new CompressionCodecFactory(

context。

  getConfiguration())。getCodec(file);

return codec == null;

public class DocInputFormat extends TextInputFormat {

private static final String RECORD_DELIMITER = "

@Override

public RecordReader createRecordReader (

InputSplit split , TaskAttemptContext tac ) {

byte [ ] recordDelimiterBytes = null ;

recordDelimiterBytes = RECORD_DELIMITER 。

   getBytes ( ) ;

return new LineRecordReader ( recordDelimiterBytes ) ;

@Override

public boolean isSplitable ( JobContext context , Path file ) {

CompressionCodec codec = new CompressionCodecFactory (

context 。

   getConfiguration ( ) ) 。

   getCodec ( file ) ;

return codec == null ;

需要指出的是,InputFormat只是把原始HDFS文件分割成String的记录,如果你的 内有其他结构化数据,那么需要在map中自己实现deserilize的相关业务逻辑来处理。

评论列表暂无评论
发表评论
微信