Hadoop 数据压缩和序列化

压缩

文件压缩有两个好处:减少了存储文件所需的空间,并加速数据在网络和磁盘间的传输

常见的压缩格式:


所有的压缩算法都需要在空间和时间上进行权衡,其中splitable表明是否可分,即是否可以搜索数据流的任意位置,并从该位置开始读取数据,可分的数据更适合MapReduce框架处理,如果使用gzip文件作为输入,hadoop通过文件扩展名确定文件不可分,将会使用一个map任务处理所有的数据块,在这种情况下最好在之前的数据处理中,选择合适的数据块大小,将文件切分成块,然后对每个块建立压缩文件

Hadoop 的 codec 算法

通用压缩解压缩算法接口CompressionCodec

public interface CompressionCodec {
  // 对写入的数据进行压缩,输出到out
  CompressionOutputStream createOutputStream(OutputStream out) 
  // 对输入数据流in中的数据进行解压缩
  CompressionInputStream createInputStream(InputStream in) throws IOException;
...
}

public interface SplittableCompressionCodec extends CompressionCodec {
...
  SplitCompressionInputStream createInputStream(InputStream seekableIn,
      Decompressor decompressor, long start, long end, READ_MODE readMode)
      throws IOException;
}

CompressionCodecFactory根据文件名称找到正确的codec

String uri;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);

默认情况下会优先使用native类库实现codec,如果查找不到才会使用纯Java实现的算法

hadoop支持的压缩格式有DEFLATE,gzip,bzip2,LZ4,Snappy,此外还可以通过设定io.compression.codecs设定添加额外的编解码器


串行化

Writable

Hadoop 的序列化格式是Writable,对应接口定义了两个方法,用来将数据写入DataOutput二进制流以及从DataInput二进制流读出状态

package org.apache.hadoop.io;

import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable {
  void write(DataOutput out) throws IOException;
  void readFields(DataInput in) throws IOException;
}

WritableComparable

WritableComparable同时实现WritableComparable接口

public interface WritableComparable<T> extends Writable, Comparable<T> 

类型比较对MapReduce来说是至关重要,因为中间有一个基于键值的排序阶段
任何在Hadoop Map-Reduce框架中用作key的类型都应该实现这个接口

RawComparator是一个优化的比较器,可以直接比较对象的字节流,不需要反序列化

public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

WritableComparator

WritableComparator是一个比较器,用来比较WritableComparable类型的数据,实现了RawComparator接口,提供compare(byte[],int,int,byte[],int,int)方法,默认方法是反序列化对象,调用对象本身的compareTo方法

可以通过继承WritableComparator对比较函数compare进行重载优化,同时注册相关类对应的比较器define(Class c, WritableComparator comparator),之后可以直接获取WritableComparator get(Class<? extends WritableComparable> c)

Writable 类

Writable 类的层次结构

Java 基本类型

Java 基本类型都有实现Writable接口的包装类型,char类型除外
通过get和set方法读取存储封装的值
int 和 long 类型有对应的变长格式封装,方便节省空间


Text 类型

使用标准UTF-8编码存储文本

索引:指的是编码后字节序列中的位置
getlength:字节的长度
charAt:Unicode编码的一个码点,类似于String的codePointAt方法
find:返回第一个查找到的位置对应的字节偏移量
Unicode

Unicode 字符集

  • 码点 code point : 一个编码表中的某个字符对应的代码值,书写方式 U+后边跟十六进制数字
  • Unicode :字符集,码点从U+0000到U+10FFFF一共21位,1,112,064个码位,分为17个代码级别,第一个平面称为基本多语言平面(Basic Multilingual Plane, BMP),或称第零平面(Plane 0),其他平面称为辅助平面(Supplementary Planes),基本多语言平面内,从U+D800到U+DFFF之间的码位区块永久保留不映射Unicode字符
  • UTF-8:变长编码规则,把Unicode字符集编码为1到4个字节
  • UTF-16:变长编码规则,把Unicode字符集编码为2个或者4个字节,Java的char类型就是UTF-16编码的一个代码单元
UTF-8
UTF-16

BytesWritable

对字节序列的封装,存储格式为字节数目(4 bytes整数字段),后面跟字节内容本身,内容可以通过set方法改变

NullWritable

不可变单例类型,调用NullWritable.get()获取,序列化长度为0,不从数据流中读取,也不写入数据

ObjectWritable / GenericWritable

ObjectWritable是一个通用的封装,用来处理String,arrays,primitive types,null等等,适合用在一个字段有多种数据类型的情况
在Hadoop RPC中,用来封装和解封装方法参数和返回类型
序列化时,每次要写出封装的类名称,更为高效的方法是使用GenericWritable,声明可能需要的类型,用相应的索引来标识类别,消耗一个字节

Writable 集合类

有6个集合类,分别是ArrayWritable, ArrayPrimitiveWritable, TwoDArrayWritable, MapWritable, SortedMapWritable, EnumSetWritable

  • ArrayWritable / TwoDArrayWritable :对应数组和二维数组的Writabe实例
  • ArrayPrimitiveWritable:对Java 基本数据类型和其数组的包装器类,没有涉及底层复制
  • MapWritable /SortedMapWritable:实现java.util.Map<Writable, Writable> / java.util.SortedMap<WritableComparable, Writable>,K/V的类型信息是串行化格式的一部分,占用一个字节,作为对应类型信息的一个索引,可以存放127种自定义的类型,可以使用NullWritable作为V,存储set集合
  • EnumSetWritable:枚举类型

自定义Writable

可以控制二进制表示形式和排序顺序,更好的提升性能
自定义的实现必须有一个默认构造函数,以便 MapReduce 框架可以实例化,并调用readFields填充字段

自定义比较函数器,直接对字节序列进行比较

 //内嵌一个Comparator内部类
public static class Comparator extends WritableComparator{
    public Comparator() {
      super(外部类.class);
    }
    
    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
        .....例如
      WritableUtils.decodeVIntSize(b1[s1]) //解析vint / vlong的第一个字节来确定字节长度
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      .... //两种比较方法应该有相同的语义
    }
}
static {
    WritableComparator.define(外部类.class, new Comparator());
}

序列化框架

序列化框架都必须实现org.apache.hadoop.io.serializer.Serialization接口,通过io.serializations属性指定使用的框架,可以通过代码,也可以通过core-default.xml文件指定

默认属性如下:意味着只有Writable 或 Avro 对象可以在外部序列化/反序列化

<property>
    <name>io.serializations</name>
    <value>
        org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization
    </value>
    <source></source>
</property>

另外还提供了一个Java Serialization序列化形式对应的框架org.apache.hadoop.io.serializer.JavaSerialization

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容