项目背景

今年,相信很多人都会在各个商场或者是电影院中可以看到各种娃娃机、幸运盒子、口红挑战等等类似机器。在抖音上《口红挑战》这款机子也是火的一塌糊涂,你只要花 10 块钱就有可能赢走一个 YSL 口红,想想就觉得很有诱惑力。或许这些机器被程序员一瞧就知道其中的猫腻,但是这个机器瞄准的是那些容易冲动消费的消费者,比如情侣、女生、带小孩的大人;像程序员这么奇葩的生物,一般都是直接被无视的哈哈。

然后呢,我们公司就是为这些设备的正常运行提供解决方案的。因此才有我今天的爬坑总结,哈哈哈哈哈.....

我们提供的解决方案是这样的,在一个门店里面会包含如下的设备:娃娃机、口红挑战、排行榜、中控,当然其中还有我们的后台服务。那么首先我会先介绍一下整个系统的架构以及各个设备的职责:

门店系统结构图.png


要解决的问题

同步加载资源

关于资源同步的,首先我们先理一下我们需要同步的资源有哪些,这些资源分别为: apk 安装包、图片、h5 相关的 index 资源。

资源更新的方式

关于更新的方式,这里其实就有一个比较坑的地方了,一开始的时候我们选择的资源更新方式比较傻,直接使用 websocket 进行资源更新的,一开始的时候只有一个设备进行连接,问题倒是不大,但是后来发现多台设备连接同时更新资源的时候问题特别大,连接经常断开,导致资源更新失败。那么这里是我遇到的第一个坑。发现这个坑之后呢,我的选择资源更新的方式就更改为:NanoHttpd。NanoHttpd 是一个开源库,是用 Java 实现的,它可以在 Android 设备上建立一个轻量级的 web server。其实在 Android 设备上创建一个轻量级的 web server 才是我们一开始就应该要选择的方向。为什么呢?首先 NanoHttpd 的使用是比较简单的,因此我们只需要几行代码就可以实现一个 web server 了;其次呢,NanoHttpd 是比较稳定的,相对于我们手动使用 websocket 去实现一个资源分发要稳定太多了。

那么在我们选择了资源的更新方式之后,有另外一个问题浮出水面了,关于服务器的 IP 地址。我们都知道,关于 Android 设备连接上移动互联网或者 WiFi 的时候都会被自动分配一个 IP 地址,因此这个 IP 地址是会变化的,我们的设备在每天晚上都会关机,然后在第二天开启重启的时候又会被分配到一个新的 IP 地址,因此服务器的 IP 地址是一直在变化的,所以这里我们需要做的是想办法把某个设备的 IP 地址给固定下来。那么接下来就来讲讲关于 NanoHttpd 创建轻量级的 web server 和如何解决 IP 变化的问题。

NanoHttpd 实现 web server

    File resourceDir = new File(Environment.getExternalStorageDirectory(), "myRootDir");
    SimpleWebServer httpServer = new SimpleWebServer(null, 18103, resourceDir, true, "*");
    httpServer.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);

解决 IP 变化的问题

在 Android 设备中,它的一个 IP 地址是会变化的,而且每个门店都会有一个自己的内部中控机,那么我们是必须要处理 IP 地址变化的这个问题的。我们的解决方案有如下两个步骤:

  1. 在路由器中根据 Mac 地址,为门店内的中控设备设置固定的 IP 地址
  2. 为每个娃娃机和口红挑战设备提供一个 IP 地址的配置文件,这个文件里面有门店中控的 IP 地址信息,放在 U 盘的指定目录下,但插入设备的时候,由 Rocket 程序将文件从 U 盘中将配置文件 copy 到设备的制定目录下,设备每次启动的时候都需要先读取配置文件,再连接本地的服务器。

资源什么时候更新

关于资源更新的,我们首先需要明确我们需要更新的资源有哪些以及我们需要更新的方式。

更新的资源

资源更新流程图.png

    try {
        // banner 资源文件
        String fileName = fileFilter.getAbsolutePath().substring(baseDirLength);
        RandomAccessFile randomAccessFile = new RandomAccessFile(fileFilter,"r");
        byte[] buf = new byte[(int) randomAccessFile.length()];
        randomAccessFile.read(buf);
        randomAccessFile.close();
        MessageDigest md5 = MessageDigest.getInstance("md5");
        byte[] hash = md5.digest(buf);
        String hashStr = ByteToHex(hash,0,hash.length);
        res.bannerFiles.put(hashStr,fileName);
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
    // 字节转换为 16 进制
    public static String ByteToHex(byte[] bt, int offset, int len) {
        StringBuffer sb = new StringBuffer();
        for (int i = offset; i < offset + len; i++) {
            int tmp = bt[i] & 0xff;
            String tmpStr = Integer.toHexString(tmp);
            if (tmpStr.length() < 2)
                sb.append("0");
            sb.append(tmpStr);
        }
        return sb.toString().toUpperCase();
    }
    public static Observable<Boolean> updateBannerRes(ResListBean resListBean) throws IOException, NoSuchAlgorithmException {
        // 获取远程 banner 的文件
        HashMap<File, String> remoteFiles = new HashMap();
        for (HashMap.Entry<String, String> entry : resListBean.bannerFiles.entrySet()) {
            remoteFiles.put(new File(entry.getValue()), entry.getKey());
        }

        FileUtils.GetFilesInDir(bannerDir,localBannerList,null);
        int baseDirLength = resDir.getAbsolutePath().length()+1;
        // step1:删除本地文件(远程 banner 中没有的文件)
        for (File localFile : localBannerList) {
            File chileFile = new File(localFile.getAbsolutePath().substring(baseDirLength));
            if (!remoteFiles.containsKey(chileFile)) {
                MainActivity.appendAndScrollLog(String.format("删除 banner 资源文件 %s\n", localFile.getAbsolutePath()));
                localFile.delete();
            }
        }

        // 下载本地没有的文件
        ArrayList<Observable<File>> taskList = new ArrayList();
        for (Map.Entry<File, String> fileEntry : remoteFiles.entrySet()) {
            File file = new File(resDir,fileEntry.getKey().getAbsolutePath());

            // step2:本地中存在和远程相同的文件名
            if (localBannerList.contains(file)) {
                // step3:根据 hash 值判断是否为同一文件
                String hashStr = FileUtils.getFileHashStr(file);
                if (TextUtils.equals(hashStr,fileEntry.getValue())){
                    MainActivity.appendAndScrollLog(String.format("保留 banner 文件 %s\n", file.getAbsolutePath()));
                    taskList.add(Observable.just(file));
                    continue;
                }
            }

            // step4:下载本地没有的文件
            String url = new URL("http", Config.instance.centralServerAddress,
                    Config.instance.httpPort,
                    new File(BuildConfig.APPLICATION_ID, fileEntry.getKey().getAbsolutePath()).getAbsolutePath()).toString();
            // step5:加入文件下载列表
            taskList.add(DownLoadUtils.getDownLoadFile(url,file));
        }

        return Observable.concat(taskList)
                .toFlowable(BackpressureStrategy.MISSING)
                .parallel()
                .runOn(Schedulers.io())
                .sequential()
                .toList()
                .observeOn(Schedulers.computation())
                .map(new Function<List<File>, ArrayList<File>>() {
                    @Override
                    public ArrayList<File> apply(List<File> files) throws Exception {
                        ArrayList<File> list = new ArrayList();
                        for (File file : files) {
                            if (!file.getAbsolutePath().isEmpty()) {
                                list.add(file);
                            }
                        }
                        if (list.size() > 0) {
                            if (!Utils.EqualCollection(list, localBannerList)) {
                                Collections.sort(list);
                            } else {
                                list.clear();
                            }
                        }
                        return list;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<ArrayList<File>, Boolean>() {
                    @Override
                    public Boolean apply(ArrayList<File> list) throws Exception {
                        if (list.size() > 0) {
                            localBannerList = list;
                            webViewHasLoad = false;
                            loadH5();
                        }
                        return true;
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<Boolean, Boolean>() {
                    @Override
                    public Boolean apply(Boolean aBoolean) throws Exception {
                        FileUtils.DelEmptyDir(resDir);
                        return true;
                    }
                })
                .toObservable();
    }

程序升级的问题

关于程序的升级,相比较于图片资源的更新要简单许多。

版本更新流程.png


    public static Observable<Boolean> updateGame(ResListBean res) throws IOException, InterruptedException {
        ArrayList<File> apkList = new ArrayList();
        FileUtils.GetFilesInDir(resDir, apkList, new String[]{
                ".apk",
        });
        // 删除本地存在的 apk 包
        for (File file : apkList) {
            file.delete();
        }
        do {
            if (res.UpdateApk == null || res.UpdateApkVersion == null) {
                break;
            }
            // 判断是否需要升级
            if (BuildConfig.VERSION_CODE >= res.UpdateApkVersionCode) {
                break;
            }

            // apk 的 URL
            final String url = new URL("http", Config.instance.centralServerAddress, Config.instance.httpPort, new File(BuildConfig.APPLICATION_ID, res.UpdateApk).getAbsolutePath()).toString();
            MainActivity.appendAndScrollLog(String.format("下载升级文件 %s\n", url));
            // 下载 apk 文件
            return DownLoadUtils.getDownLoadFile(url,resDir.getAbsolutePath(),res.UpdateApk)
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .flatMap(new Function<File, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(File file) throws Exception {
                            String path = file.getAbsolutePath();
                            MainActivity.appendAndScrollLog(String.format("升级文件下载完成 %s %s\n", path, url));
                            PackageManager pm = MainActivity.instance.getPackageManager();
                            PackageInfo pi = pm.getPackageArchiveInfo(path, 0);
                            if (pi == null) {
                                MainActivity.appendAndScrollLog(String.format("升级文件打开失败 %s\n", path));
                                return Observable.just("");
                            }
                            MainActivity.appendAndScrollLog(String.format("升级文件对比:Native(%s %s)/Remote(%s %s)\n", BuildConfig.APPLICATION_ID, BuildConfig.VERSION_NAME, pi.packageName, pi.versionName));
                            if (!BuildConfig.APPLICATION_ID.equals(pi.packageName)
                                    || BuildConfig.VERSION_CODE >= pi.versionCode) {
                                return Observable.just("");
                            }
                            return Observable.just(path);
                        }
                    })
                    .flatMap(new Function<String, Observable<Boolean>>() {
                        @Override
                        public Observable<Boolean> apply(String updateApk) throws Exception {
                            if (!updateApk.isEmpty()) {
                                Log.e(TAG, "等待游戏结束后安装升级文件...");
                                MainActivity.appendAndScrollLog("等待游戏结束后安装升级文件...\n");
                                synchronized (GamePlay.class) {//防止在游戏运行时更新版本
                                    Log.e(TAG, "发布广播");
                                    Intent intent = new Intent();
                                    intent.setAction(Config.updateBroadcast);
                                    intent.putExtra("apk", updateApk);
                                    MainActivity.instance.sendBroadcast(intent);
                                    System.exit(0);
                                }
                            }
                            return Observable.just(true);
                        }
                    });
        } while (false);
        return Observable.just(true);
    }

资源文件下载

关于资源文件的下载,我是选择 okdownload。okdownload 是一个支持多线程,多任务,断点续传,可靠,灵活,高性能以及强大的下载引擎。详情可以去看 okdownload GitHub 地址

单文件下载

DownloadTask task = new DownloadTask.Builder(url, parentFile)
         .setFilename(filename)
         // the minimal interval millisecond for callback progress
         .setMinIntervalMillisCallbackProcess(30)
         // do re-download even if the task has already been completed in the past.
         .setPassIfAlreadyCompleted(false)
         .build();


task.enqueue(listener);

// cancel
task.cancel();

// execute task synchronized
task.execute(listener);

多文件下载

final DownloadTask[] tasks = new DownloadTask[2];
tasks[0] = new DownloadTask.Builder("url1", "path", "filename1").build();
tasks[1] = new DownloadTask.Builder("url2", "path", "filename1").build();
DownloadTask.enqueue(tasks, listener);

public class DownLoadUtils {

    /**
     * 从中控下载文件到本地
     * @param url
     * @param parentPath            保存到本地文件的父文件路径
     * @param downloadFileName      保存到本地的文件名
     * @return
     */
    public static Observable<File> getDownLoadFile(String url,String parentPath,String downloadFileName){
        // 下载本地没有的文件
        MainActivity.appendAndScrollLog(String.format("开始下载资源文件 %s\n", url));
        final DownloadTask task = new DownloadTask.Builder(url, parentPath, downloadFileName).build();
        return Observable.create(new ObservableOnSubscribe<File>() {
            @Override
            public void subscribe(final ObservableEmitter<File> emitter) throws Exception {
                task.enqueue(new DownloadListener2() {
                    @Override
                    public void taskStart(DownloadTask task) {

                    }

                    @Override
                    public void taskEnd(DownloadTask task, EndCause cause, Exception realCause) {
                        if (cause != EndCause.COMPLETED) {
                            MainActivity.appendAndScrollLog(String.format("资源文件下载失败 %s %s\n", cause.toString(), task.getUrl()));
                            emitter.onNext(new File(""));
                            emitter.onComplete();
                            return;
                        }
                        File file = task.getFile();
                        MainActivity.appendAndScrollLog(String.format("资源文件下载完成 %s\n", file.getAbsolutePath()));
                        emitter.onNext(file);
                        emitter.onComplete();
                    }
                });
            }
        }).retry();
    }

    /**
     * 从中控下载文件到本地
     * @param url
     * @param saveFile  保存到本地的文件
     * @return
     */
    public static Observable<File> getDownLoadFile(String url, File saveFile){
        return getDownLoadFile(url,saveFile.getParentFile().getAbsolutePath(),saveFile.getName());
    }
}

屏蔽下拉菜单和底部导航栏

像娃娃机和格子机这些设备都是在线下直接面向用户的,因此我们不能将我们的 Android 设备全部都展现给我们的用户,我们需要对用户的行为做些限制,例如禁止用户通过导航栏或者下拉菜单退出当前程序,防止他们做出一些危险的操作。我的解决方案是把当前的 rocket 程序设置为默认启动和桌面应用程序,并将 Android 设备中自带的 launcher 程序 和 systemui 程序给禁用掉,那么设备一开始启动的时候就会启动我们的 rocket 应用,并成功的禁止了用户使用导航栏和下拉菜单来做非法的操作。

    public static void enableLauncher(Boolean enabled) {
        List<PackageInfo> piList = MainActivity.instance.packageManager.getInstalledPackages(0);
        ArrayList<String> packages = new ArrayList();
        for (PackageInfo pi : piList) {
            String name = pi.packageName;
            if (name.contains("systemui") || name.contains("launcher")) {
                packages.add(name);
            }
        }
        for (String packageName : packages) {
            su(String.format("pm %s %s\n", enabled ? "enable" : "disable", packageName));
        }
    }

    /**
     *  执行 adb 指令
     *
     */
    public static int su(String cmd) {
        try {
            Process p = Runtime.getRuntime().exec("su");
            DataOutputStream os = new DataOutputStream(p.getOutputStream());
            os.writeBytes(cmd);
            os.writeBytes("exit\n");
            os.flush();
            os.close();
            return p.waitFor();
        } catch (Exception ex) {
            return -1;
        }
    }

Iot 的实现

关于 IoT 的实现,我们这边使用的是阿里的《微消息队列 for IoT》服务,关于《微消息队列 for IoT》服务,阿里的解释如下:

微消息队列 for IoT 是消息队列(MQ)的子产品。针对用户在移动互联网以及物联网领域的存在的特殊消息传输需求,消息队列(MQ) 通过推出微消息队列 for IoT 开放了对 MQTT 协议的完整支持

名词 解释
Parent Topic MQTT 协议基于 Pub/Sub 模型,因此任何消息都属于一个 Topic。根据 MQTT 协议,Topic 存在多级,定义第一级 Topic 为父 Topic(Parent Topic),使用 MQTT 前,该 Parent Topic 需要先在 MQ 控制台创建。
Subtopic MQTT 的二级 Topic,甚至三级 Topic 都是父 Topic 下的子类。使用时,直接在代码里设置,无需创建。需要注意的是 MQTT 限制 Parent Topic 和 Subtopic 的总长度为64个字符,如果超出长度限制将会导致客户端异常。
Client ID MQTT 的 Client ID 是每个客户端的唯一标识,要求全局唯一,使用相同的 Client ID 连接 MQTT 服务会被拒绝

Android 中实现 iot

关于显示 iot 连接的实现过程是这样的:首先我们将设备的三元组从管理后台中批量生成,文件名的格式为 deviceName.json(例如:00001.json),里面是关于每个设备的三元组信息;接着我们将装有三元组文件的 U 盘插入到 Android 设备中(娃娃机或者口红挑战);rocket 程序会自动监测到 U 盘的插入并将文件剪切到 Android 设备的制定目录下;再接着 Android 设备可以去读取指定文件中三元组信息;最后使用此三元组进行连接 mqtt。

    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
属性 用处
productKey 对应程序的 key,类似于 appid
deviceName 对应上述的 Client ID,用来唯一识别一台 Android 设备的
deviceSecret 使用 HmacSHA1 算法计算签名字符串,并将签名字符串设置到 Password 参数中用于鉴权

Android 和硬件通信

在娃娃机和口红挑战的这两个设备中,我们都需要和设备进行通信,例如:娃娃机投币、娃娃机出礼反馈、按下选中口红的格子等等这些都是需要和硬件模块进行通信的。在关于串口通信的框架选择方面,我们主要是选择 Google 的 android-serialport-api 来实现。项目原地址

// su默认路径为 "/system/bin/su"
// 可通过此方法修改
SerialPort.setSuPath("/system/xbin/su");
    static void init() throws IOException {
        SerialPort.setSuPath("/system/xbin/su");
        // 设置串口号及波特率
        serialPort = new SerialPort(Config.serialPort, Config.baudrate);
        // 接收指令流
        inputStream = serialPort.getInputStream();
        // 发送指令流
        outputStream = serialPort.getOutputStream();
        // 每隔 100ms 处理机器信息
        Observable.interval(100, TimeUnit.MILLISECONDS)
                .observeOn(serialScheduler)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        // 处理机器发送的指令
                        handleRecv();
                    }
                });
    }
    static ObservableSource<MqttMessage> addCoins(final AddCoinsDownstream msg) {
        return Observable.create(new ObservableOnSubscribe<MqttMessage>() {
            @Override
            public void subscribe(ObservableEmitter<MqttMessage> emitter) throws Exception {
                currentUser = msg.u;
                currentHeadUrl = msg.h;
                currentNickname = msg.nk;
                byte[] buf = new byte[]{0x11, addCoinsCmd, msg.num, msg.c, 0, 0x00, 0x00};
                byte[] ret = sign(buf);
                try {
                    outputStream.write(ret);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                penddingCmd = addCoinsCmd;
                penddingEmitter = emitter;
            }
        })
                .subscribeOn(serialScheduler);
    }

    static void handleRecv() {
        try {
            for (; ; ) {
                int len = inputStream.available();
                if (len <= 0) {
                    break;
                }
                len = inputStream.read(buf, bufReadOffset, buf.length - bufReadOffset);
                //Log.d("serialPort", String.format("read: %s", byteToHex(buf, bufReadOffset, len)));
                bufReadOffset += len;
                for (; ; ) {
                    if (bufParseEnd == -1) {
                        for (; bufParseStart < bufReadOffset; bufParseStart++) {
                            if (buf[bufParseStart] == (byte) 0xAA) {
                                bufParseEnd = bufParseStart + 1;
                                break;
                            }
                        }
                    }
                    if (bufParseEnd != -1) {
                        for (; bufParseEnd < bufReadOffset; bufParseEnd++) {
                            if (buf[bufParseEnd] == (byte) 0xAA) {
                                bufParseStart = bufParseEnd;
                                bufParseEnd += 1;
                                continue;
                            }
                            if (buf[bufParseEnd] == (byte) 0xDD) {
                                if (bufParseEnd - bufParseStart >= 5) {
                                    bufParseEnd += 1;
                                    byte size = buf[bufParseStart + 1];
                                    byte index = buf[bufParseStart + 2];
                                    byte cmd = buf[bufParseStart + 3];
                                    byte check = (byte) (size ^ index ^ cmd);
                                    for (int i = bufParseStart + 4; i < bufParseEnd - 2; i++) {
                                        check ^= buf[i];
                                    }
                                    if (check == buf[bufParseEnd - 2]) {
                                        //Log.d("serialPort", String.format("protocol: %s, size: %d, index: %d, cmd: %d, check: %d, data: %s", byteToHex(buf, bufParseStart, bufParseEnd - bufParseStart), size, index, cmd, check, byteToHex(buf, bufParseStart + 4, size - 3)));
                                        switch (cmd) {
                                            // 心跳
                                            case heartBeatCmd: {
                                            }
                                            break;

                                            // 上分
                                            case addCoinsCmd: {

                                            }
                                            break;

                                            // 游戏结果
                                            case gameResultCmd: {
                                                boolean gift = buf[bufParseStart + 7] != 0;
                                                IoT.sendGameResult(gift);
                                                if (gift) {
                                                    // 发送用户信息到中控,进行排行榜显示
                                                    WSSender.getInstance().sendUserInfo(currentUser, currentHeadUrl, currentNickname);
                                                }
                                            }
                                            break;
                                            default:
                                                break;
                                        }
                                    }
                                }
                                bufParseStart = bufParseEnd;
                                bufParseEnd = -1;
                                break;
                            }
                        }
                    }
                    if (bufParseStart >= bufReadOffset || bufParseEnd >= bufReadOffset) {
                        break;
                    }
                }
                if (bufReadOffset == buf.length) {
                    System.arraycopy(buf, bufParseStart, buf, 0, bufReadOffset - bufParseStart);
                    if (bufParseEnd != -1) {
                        bufParseEnd -= bufParseStart;
                        bufReadOffset = bufParseEnd;
                    } else {
                        bufReadOffset = 0;
                    }
                    bufParseStart = 0;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

websocket 通信

在中控和娃娃机进行通信的方式我们是选择 websocket 进行的。中控端是 server,然后娃娃机是 client。

server


class WSServer extends WebSocketServer {
    private MainActivity mainActivity;

    public void setMainActivity(MainActivity mainActivity) {
        this.mainActivity = mainActivity;
    }

    WSServer(InetSocketAddress address) {
        super(address);
    }

    @Override
    public void onOpen(WebSocket conn, ClientHandshake handshake) {
        mainActivity.appendAndScrollLog("客户端:" + conn.getRemoteSocketAddress() + " 已连接\n");
    }

    @Override
    public void onClose(WebSocket conn, int code, String reason, boolean remote) {
        mainActivity.appendAndScrollLog("客户端:" + conn.getRemoteSocketAddress() + " 已断开\n");
    }

    @Override
    public void onMessage(WebSocket conn, final String message) {
        Observable.create(new ObservableOnSubscribe<SocketMessage>() {
            @Override
            public void subscribe(ObservableEmitter<SocketMessage> emitter) throws Exception {
                final SocketMessage socketMessage = JsonIterator.deserialize(message, SocketMessage.class);
                emitter.onNext(socketMessage);
                emitter.onComplete();
            }
        })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<SocketMessage>() {
                    @Override
                    public void accept(SocketMessage socketMessage) throws Exception {
                        if (socketMessage.getCode() == SocketMessage.TYPE_USER) {
                            // 夹到娃娃

                        } else if (socketMessage.getCode() == SocketMessage.TYPE_SAY_HELLO) {
                            // 连接招呼语
                        }
                    }
                });


    }

    @Override
    public void onError(WebSocket conn, Exception ex) {
    }

    @Override
    public void onStart() {

    }
}
    appendAndScrollLog("初始化WebSocket服务...\n");
    WSServer wsServer = new WSServer(18104);
    wsServer.setMainActivity(MainActivity.this);
    wsServer.setConnectionLostTimeout(5);
    wsServer.setReuseAddr(true);
    wsServer.start();
    appendAndScrollLog("初始化WebSocket服务完成\n");

client

在 client 端,目前需要做的人物有断开重连以及数据发送的操作。断开重连的时候需要在新的子线程中进行,否则会报如下错误:

You cannot initialize a reconnect out of the websocket thread. Use reconnect in another thread to insure a successful cleanup

因此,我们每次断开重新的时候是需要在新的子线程中进行的。除此之外,在发送数据的时候,如果刚好 socket 没有连接上,那么发送数据是会报异常的,因此我们有数据要发送的时候如果 socket 没有连接,那么就先缓存到本地,等到 socket 连接上之后再把滞留的数据一次性发送出去。

    implementation 'org.java-websocket:Java-WebSocket:1.3.9'

class WSClient extends WebSocketClient {

    private static final String TAG = "WSClient";
    private static WSClient instance;
    private static URI sUri;
    private WSReceiver mWSReceiver;
    private Disposable mReconnectDisposable;
    private ConnectCallback mConnectCallback;

    /**
     * step 1:需要先调用,设置 url
     * @param uri
     */
    public static void setUri(URI uri){
        sUri = uri;
    }

    /**
     * step 1:
     * 需要先调用,设置服务端的 url
     * @param ipAddress
     * @param port
     */
    public static void setUri(String ipAddress,int port){
        try {
            sUri = new URI(String.format("ws://%s:%d", ipAddress, port));
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }


    public static WSClient getInstance(){
        if (instance == null) {
            synchronized (WSClient.class){
                if (instance == null) {
                    instance = new WSClient(sUri);
                }
            }
        }
        return instance;
    }

    /**
     * step 2:连接 websocket
     */
    public void onConnect(){
        setConnectionLostTimeout(Config.instance.webSocketTimeoutSeconds);
        setReuseAddr(true);
        connect();
    }

    private WSClient(URI server) {
        super(server);
        // 初始化消息发送者
        WSSender.getInstance().setWSClient(this);
        // 初始化消息接收者
        mWSReceiver = new WSReceiver();
        mWSReceiver.setWSClient(this);
        mWSReceiver.setWSSender(WSSender.getInstance());

    }


    @Override
    public void onOpen(ServerHandshake handshakedata) {
        Log.d(TAG, "onOpen: ");
        MainActivity.appendAndScrollLog("websocket 已连接\n");
        Observable.just("")
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        if (mConnectCallback != null) {
                            mConnectCallback.onWebsocketConnected();
                        }
                    }
                });

        // 清除滞留的所有消息
        WSSender.getInstance().clearAllMessage();

    }

    @Override
    public void onMessage(String message) {
        Log.d(TAG, "onMessage: ");
        mWSReceiver.handlerMessage(message);
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        Log.d(TAG, "onClose: ");
        MainActivity.appendAndScrollLog(String.format("websocket 已断开,断开原因:%s\n",reason));
        Observable.just("")
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        if (mConnectCallback != null) {
                            mConnectCallback.onWebsocketClosed();
                        }
                    }
                });
        onReconnect();
    }

    @Override
    public void onError(Exception ex) {
        if (ex != null) {
            Log.d(TAG, "onError: "+ex.getMessage());
            MainActivity.appendAndScrollLog(String.format("websocket 出现错误,错误原因:%s\n",ex.getMessage()));
        }
        onReconnect();
    }


    public void onReconnect() {
        if (mReconnectDisposable != null
                && !mReconnectDisposable.isDisposed()){
            return;
        }
        mReconnectDisposable = Observable.timer(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG, "websocket reconnect");
                        WSClient.this.reconnect();
                        mReconnectDisposable.dispose();
                    }
                });

    }

    public void setConnectCallback(ConnectCallback mConnectCallback) {
        this.mConnectCallback = mConnectCallback;
    }

    public interface ConnectCallback{
        void onWebsocketConnected();
        void onWebsocketClosed();
    }
}

/**
 * Created by runla on 2018/10/26.
 * 文件描述:Websocket 的消息发送者
 */

public class WSSender {
    private static final String TAG = "WSSender";
    public static final int MAX_MESSAGE_COUNT = 128;
    private static WSSender instance;
    private WSClient mWSClientManager;
    // 消息队列
    private LinkedList<String> mMessageList = new LinkedList<>();

    private WSSender() {
    }

    public static WSSender getInstance() {
        if (instance == null) {
            synchronized (WSSender.class) {
                if (instance == null) {
                    instance = new WSSender();
                }
            }
        }
        return instance;
    }

    public void setWSClient(WSClient wsClientManager) {
        this.mWSClientManager = wsClientManager;
    }

    /**
     * 发送所有滞留的消息
     */
    public void clearAllMessage() {
        if (mWSClientManager == null) {
            return;
        }

        while (mMessageList.size() > 0
                && mMessageList.getFirst() != null) {
            Log.d(TAG, "sendMessage: " + mMessageList.size());
            mWSClientManager.send(mMessageList.getFirst());
            mMessageList.removeFirst();
        }
    }

    /**
     * 发送消息,如果消息发送不出去,那么就等到连接成功后再次尝试发送
     *
     * @param msg
     * @return
     */
    public boolean sendMessage(String msg) {
        if (mWSClientManager == null) {
            throw new NullPointerException("websocket client is null");
        }
        if (TextUtils.isEmpty(msg)) {
            return false;
        }
        // 将需要发送的数据添加到队列的尾部
        mMessageList.addLast(msg);

        while (mMessageList.size() > 0
                && mMessageList.getFirst() != null) {
            Log.d(TAG, "sendMessage: " + mMessageList.size());
            if (!mWSClientManager.isOpen()) {
                // 尝试重连
                mWSClientManager.onReconnect();
                break;
            } else {
                mWSClientManager.send(mMessageList.getFirst());
                mMessageList.removeFirst();
            }
        }

        // 如果消息队列中超过我们设置的最大容量,那么移除最先添加进去的消息
        if (mMessageList.size() >= MAX_MESSAGE_COUNT) {
            mMessageList.removeFirst();
        }
        return false;
    }
}

/**
 * Created by runla on 2018/10/26.
 * 文件描述:Websocket 的消息接收者
 */

public class WSReceiver {
    private WSClient mWSClientManager;
    private WSSender mWSSender;
    private OnMessageCallback onMessageCallback;
    public WSReceiver() {
    }


    public void setWSClient(WSClient mWSClientManager) {
        this.mWSClientManager = mWSClientManager;
    }

    public void setWSSender(WSSender mWSSender) {
        this.mWSSender = mWSSender;
    }

    /**
     * 处理接收消息
     * @param message
     */
    public void handlerMessage(String message){

        if (onMessageCallback != null){
            onMessageCallback.onHandlerMessage(message);
        }
    }

    public void setOnMessageCallback(OnMessageCallback onMessageCallback) {
        this.onMessageCallback = onMessageCallback;
    }

    public interface OnMessageCallback{
        void onHandlerMessage(String message);
    }
}
    appendAndScrollLog("初始化WebSocket客户端...\n");
    WSClient.setUri( Config.instance.centralServerAddress, Config.instance.webSocketPort);
    WSClient.getInstance().onConnect();
    WSClient.getInstance().setConnectCallback(MainActivity.this);
    appendAndScrollLog("初始化WebSocket客户端完成\n");
// 清除滞留的所有消息
WSSender.getInstance().clearAllMessage();

// 发送消息
WSSender.getInstance().sendMessage(msg);

数据库存储

在中控端,我们需要显示排行版,用来显示夹中娃娃机的用户在本月及本周夹中娃娃的排行,因此我们需要再中控端保存用户的夹中娃娃数量以及个人的其他信息,GreenDAO 是一款开源的面向 Android 的轻便、快捷的 ORM 框架,将 Java 对象映射到 SQLite 数据库中,我们操作数据库的时候,不在需要编写复杂的 SQL语句, 在性能方面,GreenDAO 针对 Android 进行了高度优化, 最小的内存开销 、依赖体积小,同时还是支持数据库加密。关于 GreenDAO 的用法我就不在这里做,具体的用法可以参考官网 GreenDAO


写在最后

关于整个系统的架构搭建过程中遇到了好多坑,以上是我为这个项目提供的部分解决方案,当前全部的是不可能都放写出来的,此项目目前已经在西安和成都等地都有门店点了,据反馈,利润极大,不过这种类型的项目红利期不会太长,估计也是 2~3 年左右吧。如果有需要我们为 口红机开发 或者是 娃娃机开发 提供解决方案的,可以联系我们,目前我们在这个方面已经有相对较为成熟的解决方案了。