Python实现赛尔号玩家地域分布统计

说在前面

赛尔号的新巅峰算是上线了,也就意味着我之前写的巅峰数据统计插件算是彻底报销了,但是我决定利用一下插件后台记录下来的数据统计一下赛尔号玩家的地域分布情况,毕竟10w个ip数据,还是有一点准确性的吧😂😂😂

此ip为记录统计插件使用人数产生的附属数据,本人不会公开ip具体数据,也不会用作其他非法用途

ip转address

这部分一开始还是比较头疼的,毕竟高达10w份ip数据,如果使用第三方api接口进行查询的话,太浪费时间了,搞不好多线程查询还可能把我ip地址拉黑了,所以我选择了使用纯真离线ip数据库:

纯真网络,中国历史最悠久的IP地理位置库 (cz88.net)

最重要的是这个数据库是每周更新并且完全免费的,也没有太高的精确度要求,所以使用这个足矣,这里百度了一下纯真ip数据库的格式,并使用python实现了对数据库的调用查询:

import socket
import struct
class CzIp:
    def __init__(self, db_file="qqwry.dat"):
        self.f_db = open(db_file, "rb")
        bs = self.f_db.read(8)
        (self.first_index, self.last_index) = struct.unpack("II", bs)
        self.index_count = int((self.last_index - self.first_index) / 7 + 1)
        self.cur_start_ip = None
        self.cur_end_ip_offset = None
        self.cur_end_ip = None
        # print(self.get_version(), " 纪录总数: %d 条 "%(self.index_count))

    def get_version(self):
        """
        获取版本信息,最后一条IP记录 255.255.255.0-255.255.255.255 是版本信息
        :return: str
        """
        s = self.get_addr_by_ip(0xFFFFFF00)
        # print(s)
        return s

    def _get_area_addr(self, offset=0):
        if offset:
            self.f_db.seek(offset)
        bs = self.f_db.read(1)
        (byte,) = struct.unpack("B", bs)
        if byte == 0x01 or byte == 0x02:
            p = self.getLong3()
            if p:
                return self.get_offset_string(p)
            else:
                return ""
        else:
            self.f_db.seek(-1, 1)
            return self.get_offset_string(offset)

    def _get_addr(self, offset):
        """
        获取offset处记录区地址信息(包含国家和地区)
        如果是中国ip,则是 "xx省xx市 xxxxx地区" 这样的形式
        (比如:"福建省 电信", "澳大利亚 墨尔本Goldenit有限公司")
        :param offset:
        :return:str
        """
        self.f_db.seek(offset + 4)
        bs = self.f_db.read(1)
        (byte,) = struct.unpack("B", bs)
        if byte == 0x01:  # 重定向模式1
            country_offset = self.getLong3()
            self.f_db.seek(country_offset)
            bs = self.f_db.read(1)
            (b,) = struct.unpack("B", bs)
            if b == 0x02:
                country_addr = self.get_offset_string(self.getLong3())
                self.f_db.seek(country_offset + 4)
            else:
                country_addr = self.get_offset_string(country_offset)
            area_addr = self._get_area_addr()
        elif byte == 0x02:  # 重定向模式2
            country_addr = self.get_offset_string(self.getLong3())
            area_addr = self._get_area_addr(offset + 8)
        else:  # 字符串模式
            country_addr = self.get_offset_string(offset + 4)
            area_addr = self._get_area_addr()
        return country_addr + " " + area_addr

    def dump(self, first, last):
        """
        打印数据库中索引为first到索引为last(不包含last)的记录
        :param first:
        :param last:
        :return:
        """
        if last > self.index_count:
            last = self.index_count
        for index in range(first, last):
            offset = self.first_index + index * 7
            self.f_db.seek(offset)
            buf = self.f_db.read(7)
            (ip, of1, of2) = struct.unpack("IHB", buf)
            address = self._get_addr(of1 + (of2 << 16))
            print("%d %s %s" % (index, self.ip2str(ip), address))

    def _set_ip_range(self, index):
        offset = self.first_index + index * 7
        self.f_db.seek(offset)
        buf = self.f_db.read(7)
        (self.cur_start_ip, of1, of2) = struct.unpack("IHB", buf)
        self.cur_end_ip_offset = of1 + (of2 << 16)
        self.f_db.seek(self.cur_end_ip_offset)
        buf = self.f_db.read(4)
        (self.cur_end_ip,) = struct.unpack("I", buf)

    def get_addr_by_ip(self, ip):
        """
        通过ip查找其地址
        :param ip: (int or str)
        :return: str
        """
        if type(ip) == str:
            ip = self.str2ip(ip)
        L = 0
        R = self.index_count - 1
        while L < R - 1:
            M = int((L + R) / 2)
            self._set_ip_range(M)
            if ip == self.cur_start_ip:
                L = M
                break
            if ip > self.cur_start_ip:
                L = M
            else:
                R = M
        self._set_ip_range(L)
        # version information, 255.255.255.X, urgy but useful
        if ip & 0xFFFFFF00 == 0xFFFFFF00:
            self._set_ip_range(R)
        if self.cur_start_ip <= ip <= self.cur_end_ip:
            address = self._get_addr(self.cur_end_ip_offset)
        else:
            address = "未找到该IP的地址"
        return address

    def get_ip_range(self, ip):
        """
        返回ip所在记录的IP段
        :param ip: ip(str or int)
        :return: str
        """
        if type(ip) == str:
            ip = self.str2ip(ip)
        self.get_addr_by_ip(ip)
        range = self.ip2str(self.cur_start_ip) + " - " + self.ip2str(self.cur_end_ip)
        return range

    def get_offset_string(self, offset=0):
        """
        获取文件偏移处的字符串(以'\0'结尾)
        :param offset: 偏移
        :return: str
        """
        if offset:
            self.f_db.seek(offset)
        bs = b""
        ch = self.f_db.read(1)
        (byte,) = struct.unpack("B", ch)
        while byte != 0:
            bs += ch
            ch = self.f_db.read(1)
            (byte,) = struct.unpack("B", ch)
        return bs.decode("gbk")

    def ip2str(self, ip):
        """
        整数IP转化为IP字符串
        :param ip:
        :return:
        """
        return (
            str(ip >> 24)
            + "."
            + str((ip >> 16) & 0xFF)
            + "."
            + str((ip >> 8) & 0xFF)
            + "."
            + str(ip & 0xFF)
        )

    def str2ip(self, s):
        """
        IP字符串转换为整数IP
        :param s:
        :return:
        """
        (ip,) = struct.unpack("I", socket.inet_aton(s))
        return (
            ((ip >> 24) & 0xFF)
            | ((ip & 0xFF) << 24)
            | ((ip >> 8) & 0xFF00)
            | ((ip & 0xFF00) << 8)
        )

    def getLong3(self, offset=0):
        """
        3字节的数值
        :param offset:
        :return:
        """
        if offset:
            self.f_db.seek(offset)
        bs = self.f_db.read(3)
        (a, b) = struct.unpack("HB", bs)
        return (b << 16) + a


if __name__ == "__main__":
    cz = CzIp()
    f = open("ip.txt")
    fi = open("ip_address.txt", "w")
    ip_data = f.read().split()
    for i in ip_data:
        fi.write(cz.get_addr_by_ip(i) + "\n")
    f.close()
    fi.close()

数据清洗

获得了ip转address的数据以后,接下来进行数据清洗,我们要统计各个省份的ip_address数量,依旧python实现

f = open("ip_address.txt")
ip_data = f.read().split("\n")
provinces = ['吉林', '天津', '台湾', '云南', '海南', '山西', '四川', '山东', '江西', '浙江', '辽宁', '福建', '湖北', '江苏', '内蒙古', \
             '宁夏', '香港', '新疆', '西藏', '甘肃', '广东', '陕西', '上海', '湖南', '重庆', '澳门', '河北', '青海', '北京', '广西', \
            '安徽', '河南', '贵州', '黑龙江']
province_num = {}
for address in ip_data:
    for province in provinces:
        if province in address:
            try:
                province_num[province] += 1 
            except:
                province_num[province] = 1
            break
    else:
        try:
            province_num["其他"] += 1 
        except:
            province_num["其他"] = 1
print(province_num)
print(sum(province_num.values()))

这部分还是比较简单的,获得了省份对应人数的字典。

绘制统计图

这里我选择使用python的pyecharts库进行统计图表绘制,感觉还是很方便的,附上官网和画廊地址 ->

简介 - pyecharts - A Python Echarts Plotting Library built with love.

Document (pyecharts.org)

from pyecharts.charts import Bar
from pyecharts.options import *
provinces = {
    "贵州": 765,
    "河南": 5764,
    "浙江": 7955,
    "辽宁": 1910,
    "江苏": 8582,
    "四川": 5187,
    "新疆": 422,
    "黑龙江": 1426,
    "广东": 13580,
    "北京": 2437,
    "山东": 6174,
    "湖北": 4336,
    "广西": 3306,
    "江西": 4222,
    "河北": 3611,
    "湖南": 3206,
    "陕西": 3377,
    "安徽": 3922,
    "吉林": 906,
    "山西": 2214,
    "云南": 1252,
    "福建": 3579,
    "天津": 1300,
    "上海": 1911,
    "海南": 922,
    "重庆": 1881,
    "甘肃": 1076,
    "宁夏": 275,
    "西藏": 50,
    "内蒙古": 548,
    "香港": 125,
    "台湾": 155,
    "青海": 120,
    '澳门': 6,
    "其他": 5056,
}

x_data = list(provinces.keys())
y_data = list(provinces.values())
bar = Bar()
bar.add_xaxis(x_data)
bar.add_yaxis("人", y_data, label_opts=LabelOpts(position="right"))
bar.reversal_axis()
bar.set_global_opts(
    title_opts=TitleOpts(title="赛尔玩家地域分布",pos_left="center",pos_bottom="bottom",),
    yaxis_opts=AxisOpts(axislabel_opts=LabelOpts(font_size=10)),
)
bar.render("赛尔玩家地域分布.html")

这里的图标绘制是比较基础简陋的,其实可以绘制出各种花里胡哨吊炸天的精美图表,最终实现的效果图:

赛尔玩家地域分布

附上在线演示网址:

Awesome-pyecharts (yuyuqaq.cn)

pyecharts部分问题解决

pyecharts默认绘制出来的图标是左上角显示,并且宽高是写死的,通过查询,可以通过下述方法实现图表居中大图显示

更改xxx\Python\Pythonxx\Lib\site-packages\pyecharts\render\templates路径下macro文件第二行的css属性,更改为 ->

margin: 0 auto;
width: 90%;
height: 90vh;
display: flex;
justify-content: center;
align-items: center;