How to encode and calculate bitmaps of active user IDs with different dates in MaxCompute

This article provides code examples that show how to use MaxCompute's MapReduce module to encode and calculate bitmaps of active user IDs with different dates.

From Qu Ning

Bitmap is a Data Developer ? spm = a2c65.11461447.0.0.50376dabbEsxtJ) is a commonly used technique for encoding and compressing user data. The fast AND, OR, and NOT operations of bitmaps allow developers to filter users by user information such as profile tags and analyze weekly activity.

Consider the following code example.

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator;

public class bitmapDemo2
{

    public static class BitMapper extends MapperBase {

        Record key;
        Record value;
        @Override
        public void setup(TaskContext context) throws IOException {
            key = context.createMapOutputKeyRecord();
            value = context.createMapOutputValueRecord();
        }

        @Override
        public void map(long recordNum, Record record, TaskContext context)
                throws IOException
        {
            RoaringBitmap mrb=new RoaringBitmap();
            long AID=0;
            {
                {
                    {
                        {
                            AID=record.getBigint("id");
                            mrb.add((int) AID);
                            //获tori key
                            key.set(new Object[] {record.getString("active_date")});

                        }
                    }
                }
            }
            ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());
            mrb.serialize(new DataOutputStream(new OutputStream(){
                ByteBuffer mBB;
                OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                public void close() {}
                public void flush() {}
                public void write(int b) {
                    mBB.put((byte) b);}
                public void write(byte[] b) {mBB.put(b);}
                public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
            }.init(outbb)));
            String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
            value.set(new Object[] {serializedstring});
            context.write(key, value);
        }
    }

    public static class BitReducer extends ReducerBase {
        private Record result = null;

        public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }

        public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
            long fcount = 0;
            RoaringBitmap rbm=new RoaringBitmap();
            while (values.hasNext())
            {
                Record val = values.next();
                ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));
                ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);
                RoaringBitmap p= new RoaringBitmap(irb);
                rbm.or(p);
            }
            ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());
            rbm.serialize(new DataOutputStream(new OutputStream(){
                ByteBuffer mBB;
                OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                public void close() {}
                public void flush() {}
                public void write(int b) {
                    mBB.put((byte) b);}
                public void write(byte[] b) {mBB.put(b);}
                public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
            }.init(outbb)));
            String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
            result.set(0, key.get(0));
            result.set(1, serializedstring);
            context.write(result);
        }
    }
    public static void main( String[] args ) throws OdpsException
    {

        System.out.println("begin.........");
        JobConf job = new JobConf();
        
        job.setMapperClass(BitMapper.class);
        job.setReducerClass(BitReducer.class);

        job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));

        InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
//        +------------+-------------+
//        | id         | active_date |
//        +------------+-------------+
//        | 1          | 20190729    |
//        | 2          | 20190729    |
//        | 3          | 20190730    |
//        | 4          | 20190801    |
//        | 5          | 20190801    |
//        +------------+-------------+
        OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
//        +-------------+------------+
//        | active_date | bit_map    |
//        +-------------+------------+
//        20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
//        20190730,OjAAAAEAAAAAAAAAEAAAAAMA
//        20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D

        JobClient.runJob(job);
}
}

Now let's talk about this code. After packaging the Java application and uploading it to the MaxCompute project, the developer can call what is given on this MapReduce job in MaxCompute. For the data in the input table, the date is used as the key to encode the user ID, and the OR operation is performed on the user ID encoded by the bitmap of the same date. Alternatively, if necessary, you may perform an AND operation, for example, in the case of retention. The processed data is then written to the target structure table for further processing.

Recommended Posts

How to encode and calculate bitmaps of active user IDs with different dates in MaxCompute
How to deal with different versions of rbenv and Ruby
[Webpacker] Summary of how to install Bootstrap and jQuery in Rails 6.0
How to delete large amounts of data in Rails and concerns
How to encrypt and decrypt with RSA public key in Java
How to execute with commands of normal development language in Docker development environment
How to change the maximum and maximum number of POST data in Spark
[Rails] How to get the user information currently logged in with devise
How to insert processing with any number of elements in iterative processing in Ruby
How to send custom metrics and events to datadog with laravel in docker-compose environment
How to convert a value of a different type and assign it to another variable
How to use Eclipse on my PC with 32bit and 2GB of memory
Summary of how to select elements in Selenium
JDBC promises and examples of how to write
How to use JQuery in js.erb of Rails6
How to build API with GraphQL and Rails
How to redirect after user login with Spring-security
How to implement authentication process by specifying user name and password in Spring Boot
How to use git with the power of jgit in an environment without git commands
How to set the retry limit of sidekiq and notify dead queues with slack
[Rough explanation] How to separate the operation of the production environment and the development environment with Rails
Summary of how to use the proxy set in IE when connecting with Java