189 8069 5689

Mapreduce构建hbase二级索引-创新互联

import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; public class IndexBuilder {     private class MyMapper extends TableMapper {         private Map indexes = new HashMap();         private String columnFamily;         @Override         protected void map(ImmutableBytesWritable key, Result value,                 Context context) throws IOException, InterruptedException {             Set keys = indexes.keySet();             for (byte[] k : keys) {                 ImmutableBytesWritable indexTableName = indexes.get(k);                 byte[] val = value.getValue(Bytes.toBytes(columnFamily), k);                 Put put = new Put(val);// 索引表的rowkey为原始表的值                 put.add(Bytes.toBytes("f1"), Bytes.toBytes("id"), key.get());// 索引表的内容为原始表的rowkey                 context.write(indexTableName, put);             }         }         @Override         protected void setup(Context context) throws IOException,                 InterruptedException {             Configuration conf = context.getConfiguration();             String tableName = conf.get("tableName");             columnFamily = conf.get("columnFamily");             String[] qualifiers = conf.getStrings("qualifiers");             // indexes的key为列名,value为索引表名             for (String q : qualifiers) {                 indexes.put(                         Bytes.toBytes(q),                         new ImmutableBytesWritable(Bytes.toBytes(tableName                                 + "-" + q)));             }         }     }     public static void main(String[] args) throws IOException,             ClassNotFoundException, InterruptedException {         Configuration conf = HBaseConfiguration.create();         String[] otherargs = new GenericOptionsParser(conf, args)                 .getRemainingArgs();// 去除掉没有用的命令行参数         // 输入参数:表名,列族名,列名         if (otherargs.length < 3) {             System.exit(-1);         }         String tableName = otherargs[0];         String columnFamily = otherargs[1];         conf.set("tableName", tableName);         conf.set("columnFamily", columnFamily);         String[] qualifiers = new String[otherargs.length - 2];         for (int i = 0; i < qualifiers.length; i++) {             qualifiers[i] = otherargs[i + 2];         }         conf.setStrings("qualifiers", qualifiers);         Job job = new Job(conf, tableName);         job.setJarByClass(IndexBuilder.class);         job.setMapperClass(MyMapper.class);         job.setNumReduceTasks(0);         job.setInputFormatClass(TableInputFormat.class);         // 可以输出多张表         job.setOutputFormatClass(MultiTableOutputFormat.class);         Scan scan = new Scan();         scan.setCaching(1000);         TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,                 ImmutableBytesWritable.class, Put.class, job);         job.waitForCompletion(true);     } }

创新互联-专业网站定制、快速模板网站建设、高性价比大同网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式大同网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖大同地区。费用合理售后完善,10多年实体公司更值得信赖。

另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


分享文章:Mapreduce构建hbase二级索引-创新互联
分享网址:http://jkwzsj.com/article/pisjs.html

其他资讯