Nico's Blog

兴趣使然的Coder


  • 首页

  • 标签

  • 分类

  • 归档

存储介质

发表于 2022-02-22

前言

开发一个简单的应用,我们只需要掌握mysql的增删改查即可,这看起来是一件非常easy的事情,而事实确实如此,但是前提是我们的目标是冲着简单去的。再将小目标强化一下,我们想开发复杂一些的应用,这不得不需要我们进一步加强对mysql的了解,例如索引等调优技术,随着目标的不断加强,我们需要对于整个应用所涉及到的技术拥有更加透彻的认知。

从此刻开始,让我们将思想坠入硬件层,深入但是又不太深入的了解一下各种存储介质的存储原理。

HDD

机械硬盘HDD(Hard Disk Drive)是一款磁物质存储类型的硬盘,它通过磁头转轴带动磁头在磁盘上运动来达到存储的目的,在此过程中,磁头不会与磁盘接触,电机控制磁头的电磁流来影响磁盘上磁极发生正负变化,从而做到存储二进制数据。

image.png

HDD的大体结构也并不算复杂。从平面来看,它主要由磁头、盘片、电机、磁头、转轴等组成。从侧面来看,HDD不止一个盘片,每个盘片都有正反两个盘面,每个盘面都配有一个磁头。

image.png

HDD在逻辑上又划分为磁道、柱面、扇区。每个盘面上被划分成许多同心圆,每个同心圆的圆周被称为磁道,离转轴最远(最外圈)的磁道为0号磁道,读写将会从0号磁道开始向内圈移动。多个盘面垂直方向同轴磁道组成了柱面。同时,每个盘面又被切分成多个扇形区域,每个扇形区域被称为扇区。扇区是机械硬盘的基本读写单位,每个扇区容量为512Byte,扇面外圈的扇区虽然比内圈看着要大一些,实则磁物质密度会比内圈低很多,最终容量并不会因此而改变。

值得一提的是,因为磁头切换磁道需要臂杆做机械运动,所以效率很低,而切换柱面只需要电控磁头,所以所有磁头同一时刻都是同轴的,读写过程也并不是按照盘面来写,而是先写磁道,磁道满了以后向下写入同一柱面的其它磁道,当整个柱面都写满以后再切换到下一个柱面。由于所有盘片都固定在转轴上,而磁头也只能验证盘片半径方向运动,所以多盘机械硬盘面多多个读写指令,也只能串行处理。

当需要做数据读写时,首先拿到要读数据的盘面号、扇区号和磁头号,然后转轴带动盘片到对应的扇区,机械臂带动对应的磁头移动到对应的柱面,读写完毕之后再进行下一个读写指令的处理。

DRAM

动态随机存储器DRAM(Dynamic Random Access Memory)又简称为内存,相比ROM(Read-Only Memory)的非易失性,DRAM的数据会随着断电而消失,因此,DRAM一般被用于临时存储,而非持久化场景。另外,因为电容存在漏电现象,长时间的断电会导致电荷流失以至于数据失真,所以DRAM会周期性的刷新充电,这也是“动态”的由来。

DRAM的存储的原理是通过电容来存储电荷,通过电荷量来表示二进制中的0和1。

image.png

上图为DRAM存储电路逻辑图,整个数字电路做的事情只是为了存储1bit的数据,其中电容(Capacitor)用来存储电荷,晶体管(Transistor)用来控制外部对电容的访问,Worldline为字线(行),Bitline为数据线(列),整个电路组成了DRAM存储的最小单元cell,实际应用中,无数个cell以矩阵的形式排列,这种矩阵被称为bank,一个bank中的cell通过行地址和列地址进行唯一标识。

image.png

在数据读写之前,需要先对目标cell进行寻址。首先通过行地址解码器定位到行地址线路并发出行地址选通脉冲RAS(Row Address Strobe)信号,因为Capacitor相比Bitline上的电容值相差太大,当从Capacitor上读取数据时对于Bitline上电压影响非常小,导致无法有效识别具体数据,所以在RAS信号激活后,需要使用读出放大器(Sense Amplifier)将电容充放电信号放大,这个过程称为Activate。

当RAS信号被激活后,通过列地址解码器定位到列地址线路并发出列地址选通脉冲CAS(Column Address Strobe)信号,之后放开Transistor,通过改变外部电压状态来感应或改变Capacitor中的电荷以完成数据读写,这个过程称为Precharge。

在读取过程中,因为Capacitor与Bitline上的电压差,这样会使Capacitor上的电容量发生变化,导致下次读取失真,为了保证每次读取到的数据都是准确的,在读取完毕后需要再次充电操作。

为了更好的描述性能,引入了描述内存性能的四个时序参数:CL、TRCD、TRP和TRAS,单位为时钟周期(ns级),每个参数周期如下:

image.png

因为DRAM定时充电刷新的特性,性能会受到一定的影响,而静态随机存储器SRAM(Static Random Access Memory)弥补了这个缺陷,它不需要定时刷新,只要存储器保持通电,数据就可以持久保存,当然断电后仍然会丢失。SRAM之所以能做到这一点,是因为它去掉了电容,通过两个耦合的反相器的来锁住输出状态,有兴趣可以私下深究。

SSD

固态硬盘SSD(Solid State Disk或Solid State Drive)是基于半导体闪存(NAND Flash)作为存储介质的硬盘,相比传统的机械硬盘,它去掉了机械固件,引入了主控芯片来代替机械操作,这意味着它的性能、功耗及可靠性会比HDD高出一大截。

image.png

Flash闪存是一种非易失性的内存,这意味着即使断电也不会丢失数据,使用双层浮栅(Floating Gate)MOS管作为存储单元cell,与DRAM 1T1C(一个Transistor + 一个Capacitor)存储方式不同的是,浮栅晶体管上下被绝缘层包围,可以将电子锁在其中,避免数据因掉电而消失。

image.png

SSD目前的存储单元根据存储量的大小分为SLC(Single Level Cell)、MLC(Multiple Level Cell)和TLC(Triple Level Cell),就像名称所蕴含的意思一样,SLC可存储1bit数据,MLC可存储2bit数据,TLC可存储3bit数据。SLC是最简单的,其内部状态只有0和1,而MLC要同时存在2bit数据,其内部状态数量增加到了 2^2 个 ,分别为是00/01/10/11,同理,TLC也可以推出它的内部状态数量为 2^3 个,分别为000/001/010/011/100/101/110/111,可以发现,随着存储量的增加,存储单元内部的状态呈指数倍上升,要识别出具体的状态,就要将电子个数进行更细致的划分,这也就意味着读写时的控制更加精细,如果抛开存储数据量的差异,单纯在性能上作比较的话,SLC是最强的,其次是MLC。

image.png

与DRAM类型,SSD结构上同样分层,主控通过多个通道Channel操作多块Flash闪存颗粒DIE(硅晶片,也被称为逻辑单元LUN),一个DIE由多个Plane组成,一个Plane由多个Block组成,每个Block下包含若干个Page。其中Block是擦除的最小单位,Page是最小的读写单位。每个Plane都有独立的Cache Register和Page Register,大小与Page一样,目的是做主控和存储介质之前的缓冲区。

image.png

Page大小一般为4KB的整数倍,作为闪存读写最小单位,哪怕读写1byte数据,也要访问整个Page。同时,Page不允许覆盖写,举个例子,先写入1byte数据到Page1,再写入1byte数据时不会因为Page1未满就追加进去,而是先读出Page1中之前的1byte数据并与新数据合并后写入Page2,之后将Page1标记为stale,等待被GC。

image.png

因为闪存不能覆盖写,想要在老地方写数据,只能先擦除,这个过程会对闪存块造成一定的磨损,当闪存块的寿命结束,将会成为坏块,另外闪存块读的次数太多也会导致上面数据失真,所以SSD中的闪存转换层FTL(Flash Translation Layer)至关重要。它通过来维护一张映射表(Map Table)来完成用户逻辑地址到闪存物理地址的转换,另外为了避免上述一系列问题,FTL还做了很多“份外”之事,包括但不仅限于:

  • 垃圾回收(Garbage Collection):当闪存写入达到阈值时,触发GC回收,腾出更多的写入空间。
  • 磨损平衡(Ware Leveling):为了延长Block寿命,要避免对某些Block高频读写,保证所有块的均衡写入。
  • 消除读干扰(Read Disturb):当闪存块读取达到阈值时,FTL将这些数据迁移至其它块。

¶映射管理

用户逻辑地址到闪存物理地址的转换是FTL本职工作,FTL在内部维护了一张映射表,其中维护了用户逻辑空间到闪存空间中Block或Page的映射关系。

在进行读写时,用户通过逻辑地址访问SSD,会先经过映射表的转译来获取闪存物理地址再进行之后的操作,假如是对新块写操作,此时映射表并没有当前映射关系,经过FTL挑选合适的块写入后,映射表中会将当前映射记录下来。

映射表本身也会占用一定的空间用于自身的存储,SSD也都会为此类事情预留一部分空间。由于映射表的访问很频繁,通常会将映射表存储在RAM中,市面上有些SSD的主板上会自带一块小型DRAM用于映射表的缓存,而另外一部分SSD则选择会使用主机内存。

¶垃圾回收

因为闪存不能覆盖写的特性,每次写入老块之前都要先擦除,这个过程就是先将块数据读出,将有用数据读取出来,然后将当前块擦除,再将有效数据和当前数据一起写入目标块中,这个过程做的操作简称为读擦写,而这个过程为称为垃圾回收(GC)。

image.png

新盘刚开始写入过程中,因为是空盘,所以不会触发GC,当硬盘空间可用量小于某一个阈值时,GC操作将会触发。在GC之前,用户数据会先写入预留空间OP(Over Provisioning)中,这个空间对用户不可见,所以通常我们拿到收的硬盘可用空间往往会比规格容量小一些。

很明显,这种GC会使原本简单的操作变得复杂且低效,并且相比原本要写入的数据,会对块中有效数据做额外的写入,这种现象称为写放大WA(Write Amplification),计算公式为:WA = 实际写入量 / 用户数据量,其中实际写入量 = 块有效数据量 + 用户数据量。可以看出,块中的垃圾密度越大,有效数据就越小,WA也随之而然的跟着变小。

另外,SSD中GC有两种触发方式,一种是前台垃圾回收Foreground GC,它在用户写入数据无可用块时主动触发,另外一种是后台垃圾回收Background GC,它在SSD空闲时由内部控制触发。前者出发时,用户写入需要额外等待GC的完成,后者则尽可能避免前者带来的时间的耗费。

¶4K对齐

传统HDD扇区单位一直习惯于512Byte,有些文件系统默认保留前63个扇区,也就是前512 * 63 / 1024 = 31.5KB,假设闪存Page和簇(OS读写基本单位)都大小为4KB,那么一个Page对应着8个扇区,用户数据将于第8个Page的第3.5KB位置开始写入,导致之后的每一个簇都会跨两个Page,读写处于超界处,这对于闪存会造成更多的读损及读写开销。

除了OS层的4K对齐至关重要以外,在文件写入过程中仍然需要关注4K对齐的问题。假设Page大小仍然为4KB,向一个空白文件写入5KB数据,此时需要2个Page来存储数据,并且可以写满Page1,而Page2只写入1KB数据,当再次向文件顺序写入数据时,仍然需要读取Page2的数据与新写入数据合并再写入新的Page中,此时额外的开销已经产生了。

对于这种情况我们可以人工补齐4K来避免额外的读开销,当然这个优化不是必须的,同时要结合自己的实际场景来做抉择。

¶读写顺序

顺序读写和随机读写一直以来都是读写性能优化中最重要的一点,无论HDD还是SSD,读写方式的不同所带来的影响也不同。随机读写意味着要重新寻址,对于HDD来说需要磁头的切换甚至柱面的切换,SSD则需要重新定位行列地址线,相比于HDD,SSD不需要机械运动,因此随机读写带来的影响会小很多,而顺序读写可以免去再次寻址过程,在应用设计允许的情况下,顺序读写都是最优之选。

对于SSD来说,顺序写可以保证数据有序的写在相邻的Page和Block中,当数据或文件删除时,GC垃圾也相对比较集中,可以大大降低WA带来的影响。

主流操作系统都会有预读,顺序读可以提高预读缓存命中率,减少硬件读取次数。所以,如果可以,请尽可能顺序读写。

参考

  • 深入了解机械硬盘的读写原理和碎片的产生
  • 内存(DRAM)的工作原理及时序介绍
  • DRAM 原理 1 :DRAM Storage Cell
  • Meaning Behind Ram RAS and CAS
  • 浅聊SRAM和DRAM的区别
  • SSD 背后的奥秘
  • 深入浅出SSD - 固态存储核心技术原理与实战

【参赛总结】第二届云原生编程挑战赛-冷热读写场景的RocketMQ存储系统设计

发表于 2022-02-22

¶引子

在一个浑浑噩噩的下午,百无聊赖的我像往常一样点开了划水交流群,细细品味着老哥们关于量子力学的讨论。嬉戏间,平常水不拉几的群友张三忽然发了一张大大的橙图,我啪的一下点开了,很快啊,仔细观摩后发现原来是2021第二届云原生编程挑战赛报名的海报,暗暗的想起了被我鸽掉的前几届,小手不自觉地打开了链接并且一键三连。

每个人的心里都有一个童心未泯的自己,这次比赛就像一场游戏一样让我深陷其中,三岔路口,我选择了存储领域,谁承想这决定会让我在接下来的两个月里减少百分之N的发量。

¶读题

赛题目的是实现简单的消息读取与存储,程序需要实现append和getRange方法,并依次通过性能评测与正确性评测,性能评测耗时最少者居高。

¶评测环境

Linux下的4核8G服务器,配置400G ESSD PL1云盘,吞吐可达320MiB/s,60G Intel 傲腾持久内存PMem(Persistent Memory),由参考文档可推测为第一代持久内存,代号为AEP。

赛题编程语言限制为Java8,JVM配置为6G堆内+2G堆外。

¶性能评测

评测程序首先会创建10~50个不等的线程,每个线程随机分配若干个topic进行写入,topic总数量不超过100个。每个topic之下又分为若干个queue,总数量不超过5000个,调用append方法后返回当前数据在queue中的offset,由0开始。每次写入数据大小为100B-17KiB区间随机,当写满75G数据后,会挑选一半的queue由下标0(头)开始读取,另外一半从当前最大下标(尾)开始读取,并保持之前的写入压力继续写入50G数据,最后一条数据读取完毕后停止计时。

¶正确性评测

同样会使用N个线程写入数据,在写入过程中会重启ECS,之后再读取之前写入成功的数据(返回offset即视为成功),要求严格一致。

¶持久内存

本次比赛多了一个比较陌生的存储介质PMem,它结合了内存的读写性能和持久化的特性,可以在延迟可以控制在纳秒级。

目前主流的实现为非易失性双列直插式内存模块NVDIMM(Non-Volatile Dual In-Line Memory Module,NVDIMM),它是持久内存的一种实现,目前有三种实现标准:

  • NVDIMM-N: 配置同等容量的DRAM和NAND Flash,另外还有一个超大电容,当主机断电后,PMem设备会使用电容中保留的电量保证DRAM的数据同步到闪存中。
  • NVDIMM-F: 使用了适配DDR规格的NAND Flash,通过多个控制器和桥接器将DDR总线信息转化为SATA协议信息来操作闪存的读写。
  • NVDIMM-P: 同样配置了DRAM和NAND Flash,只不过DRAM容量会比闪存少很多,DRAM在其中作为闪存上层的缓存以优化读写性能,同样使用超大电容来保障断电后的脏数据持久。

Intel傲腾第一代持久内存AEP遵循NVDIMM-P标准,实现了非易失性,可以按字节寻址(Byte Addressable)操作,小于1μs的延时,以及集成密度高于或等于DRAM等特性。不同于传统的NAND Flash实现,傲腾持久内存使用了新型非易失性存储器3D-XPoint,其内部是一种全新的存储介质。

Intel傲腾持久内存提供多种操作模式:

  • 内存模式: 此模式下持久内存被当做超大容量的易失性内存使用,其中DRAM被称为近内存(Near Memory),持久化介质被称为远内存(Far Memory),读写性能取决于读写时命中近内存还是远内存。
  • AD模式: 此模式下持久内存直接暴露给用户态的应用程序直接调用,应用程序通过持久内存感知文件系统(PMEM-Aware File System)将用户态的内存空间直接映射到持久内存设备上,从而应用程序可以直接进行加载(Load)和存储(Store)操作。这种形式也被称作DAX,意为直接访问。目前主流的文件系统ext4, xfs 都支持Direct Access的选项(-o dax),英特尔也提供了用于在持久内存上进行编程的用户态软件库PMDK。

本次比赛使用AD模式。

¶分析

首先关注的是正确性评测,写入过程会重启ECS,那么就要保证在append方法return之前数据要落盘,也就是说每个写入请求都要fsync刷盘。另外在重启ECS之后,会清理PMem上的数据,所以数据肯定要在ESSD上保存一份。

总写入数据量为125G,而ESSD提供400G容量,正常写入的情况下不用考虑硬盘GC的问题。除了ESSD空间外,我们还有60G的PMem可用,而且文件系统通常会预留一部分文件空间作紧急情况使用,所以PMem可用容量会更高(实测真实容量为62G左右)。DRAM内存也要尽可能利用起来,首选不受JVM限制的2G堆外,剩下的6G堆内如何使用就要在GC和整体性能之间做抉择了。

¶文件写入

方案1: 每个queue一个文件,这样可以保证顺序读写,但最坏的情况下需要创建100 * 5000 = 500,000个文件,操作系统默认每个用户进程1024个句柄肯定会超限。

方案2: 每个topic一个文件,那么最坏只需要创建100个文件,可以接受,但这意味着多个queue的数据要写入同一个文件中,无法保证顺序读写,不过可以是使用稀疏索引来做块存储。另外因为正确性评测的限制,我们需要在每次写入后手动fsync,所以这种设计下会导致频繁的fsync,也就意味着用户态与内核态之间要频繁的切来切去,另外数据大小范围为100B~17KiB,ESSD在一次写入32K以上数据时才能发挥最优性能,很明显当前设计是打不满ESSD PL1的吞吐的。

enter image description here

方案3: 所有topic共用一个文件,通过对以上弊端的思考,我们应该尽可能每次fsync时写入更多的数据,由于N个线程并发写同一个文件,所以我们可以将N个线程的数据先写入聚合缓冲中后并挂起,等待将缓冲中的数据刷盘后再取消阻塞。这个方案可以保证顺序写随机读,每次写入数据足够多,并且减少了核态的切换次数,但是刷盘变成了串行,或许能得到一个不错的ESSD吞吐,但是对CPU造成了浪费。

在上一个假设上做优化,因为评测环境配置4核CPU,我们将所有线程分为4组,每组对应一个文件,这样既可以保证ESSD的性能,又可以在无法绑核的情况下尽可能压榨所有CPU的性能。

文件读写的API方面,首先放弃传统的FileWriter/FileRead,相比而言,FileChannel提供双向读写能力且更易操控读写数据精度。MMap是另外一种方案,因为它只在创建的时候需要切态,理论上它的读写速度会比FileChannel更快,但是由于种种原因,MMap映射大小受限,这无疑增加了程序设计上的维护成本,另外最终场景每次写入数据量平均在64KB左右,通过Benchmark,FileChannel在这种场景下性能总是优于MMap。最终选定使用FileChannel进行文件读写,另外为了减少用户与向内核态的内存复制,使用DirectByteBuffer用作写入缓冲。

**最终方案:**将所有线程分为4组,充分利用多核CPU,每组对应一个AOF数据文件,每组线程的数据写入缓冲后并挂起,缓冲刷盘后再取消阻塞,返回offset。

¶缓存利用

首先要明确一点,在本次赛题中,无论是DRAM还是PMem,都不能利用它们用来做数据的持久化(PMem正确性阶段重启后会做数据清理),ESSD是必须要求写入的。因此,缓存的主要利用方向在于提高读性能。

首先是性能最快但是容量最小的DRAM,官方不允许使用unsafe来额外分配堆外的堆外内存,所以可供我们使用的DRAM只有2G的堆外以及6G的堆内,又由于JVM的GC机制外加程序本身的业务流程需要一定的内存开销,所以6G的堆内可供我们用来做数据存储的部分大打折扣(实际测下来可以用到3.2G),而堆外内存会有一部分用于文件读写缓冲,所以堆外内存可用量也会小于2G。另外就是62G的持久内存PMem,由于其性能优于ESSD数百倍,容量远大于DRAM,且ext4支持dax模式,可直接用FileChannel操作读写,对于它的合理使用直接决定了最终成绩的好坏。

再回到性能评测上进行分析,我们将整个过程分为是三个阶段(重点,下文要考):

  • 一阶段: 先写入75G的数据。
  • 二阶段: 评测程序随机挑选一半的queue从头开始读,另一半从结尾开始读,并在读的同时,继续写入50G的数据。
  • 三阶段: 随着时间的推移,最终读取的offset点位会慢慢追赶上当前写入的点位,此阶段中刚写入的数据有可能下一刻被读取。

经过分析,我们需要在一阶段尽可能的将数据写入缓存,这样二阶段读取时可以减少ESSD的命中率。由于二阶段会有一半的queue从结尾开始读数据,这也就意味着这些queue之前的数据可以被淘汰,淘汰后的缓存可以复用于之后写入的数据。另外由于二阶段的过程是边读边写,读后的缓存也可以投入复用。

所以理论上二阶段所有写入的数据全部可以复用到淘汰后的缓存。到了三阶段后,应该尽可能使用性能最高的DRAM来存储热数据。

**最终方案:**一阶段首先将缓存写入大约5G的DRAM中,之后的数据写入62G的PMem中(此过程的ESSD一直保持着写入),每个记录的缓存信息保存在对应的queue中。来到二阶段后,将淘汰的缓存按介质类型及大小放入不同的缓存池,之后写入的数据会优先向DRAM缓存池申请缓存块,其次是PMem缓存池。

当然,前期的分析也只能基于理论,最终方案的背后是无数个日日夜夜的测试和思考(卷就完了。

¶整体方案

QQ截图20211206174435.png

一阶段开始,将所有线程随机分为4组,每组对应1个AOF文件,在写入ESSD的同时,异步写入DRAM或PMem中。理论上在写入 5G + 62G = 67G 数据后缓存用尽,从此刻开始到写满75G之前都只是单纯写硬盘,所有的异步任务也将在此期间全部执行完毕。

二阶段开始,每次读取都会淘汰失效的缓存并放入缓存池中,写入过程中会优先按照记录大小从缓存池中获取到相应的缓存块,理想情况下每次都能申请到对应的缓存块并写入,Missing时记录数据在ESSD上的位置索引。

每次读取时,根据offset从获取对应的数据索引,到索引指定的介质中读取数据并返回。

¶缓存池

本次赛题一共有DRAM,PMem以及ESSD三种介质,而读写的最小颗粒度为100B-17KiB的数据,我们将多个介质的的操作抽象为 Data 类,它提供单条数据读写功能,每种介质单独实现抽象方法,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class Data {
// 缓存块大小
protected int capacity;
// 数据在文件中开始存储的位置
protected long position;
// 从介质中读取
public abstract void get(ByteBuffer buffer);
// 从介质中写入
public abstract void set(ByteBuffer buffer);
// 从介质中清除
public abstract void clear();
}

在一阶段中,会按照写入大小创建对应介质的Data,它记录了这条数据在当前介质中的索引信息(如果是DRAM则直接存放ByteBuffer指针),例如当DRAM和PMem写满时,Data记录的是当前数据在ESSD中的position以及capacity。

二阶段开始时,随着queue的读写会淘汰无效的DRAM和PMem Data并放入对应的缓存池中,二阶段过程中的写入会优先从DRAM缓存池中获取闲置的Data,如果获取失败则从PMem缓存池获取,如果依然失败会降级为SSD Data(相当于不走缓存)。如果获取成功,则将数据写入到当前缓存块中并记录在Queue索引中。

由于二阶段中的缓存块都是从缓存池中获取,因此缓存块大小是固定的,会出现块大小 小于当前写入数据大小的情况,当发生此类情况时,不足的大小会使用预留的堆外内存补救,这块数据被称为 ext,调用clear()方法同时会释放 ext 。

enter image description here

另外,为了减少使用额外的 ext ,缓存池会根据 Data 的capacity大小将之进行分组,当从缓存池获取闲置缓存块时,会根据写入数据的大小到缓存池分组中进行匹配,取出合适区间中的缓存块进行使用。

1
2
3
4
5
6
7
// 17K / 5 五组内存回收池
public LinkedBlockingQueue<Data> getReadBuffer(int cap){
return cap < Const.K * 3.4 ? null : cap < Const.K * 6.8
? readBuffers2 : cap < Const.K * 10.2
? readBuffers3 : cap < Const.K * 13.6
? readBuffers4: readBuffers5;
}

¶数据索引

程序执行过程中,数据写入后会记录一条索引到具体的queue中,由于offset从0开始并有序的特性,每个queue中会实例化一个 ArrayList 来记录该索引,下标即是offset,value的话则为 Data :

1
private final List<Data> records;

¶AOF中的数据格式

由于准确性阶段需要数据的recover,所以直接存储在AOF中的数据需要记录一些额外的索引信息:

enter image description here

当recover时,首先会读取9个Byte来获取头信息,当校验通过后,会根据Data Len来继续读取真实的数据,之后根据TopicId,QueueId,Offset等信息找到目标队列预先建立索引。

¶文件预分配

根据官方渠道得知,评测环境使用的文件系统为ext4,在ext4文件系统下,每次创建一个物理文件会子啊系统中注册一个inode来记录文件的元数据信息以及block索引树的根节点。

当我们对文件进行读写时,首先会从extent tree中寻找合适的block逻辑地址,再从block中拿到硬盘设备中的物理地址方可操作。如果找不到合适的extent或block则需要创建,此过程还涉及到inode中元数据的变动,对内核代码简单追踪可知,最终会调用 ext4_do_update_inode 方法完成inode的更新。

1
2
3
4
5
6
7
8
9
10
11
ext4_write_begin
__block_write_begin
get_block -> ext4_get_block_unwritten
_ext4_get_block
ext4_map_blocks
ext4_ext_map_blocks
ext4_ext_insert_extent
ext4_ext_dirty
ext4_mark_inode_dirty
ext4_mark_iloc_dirty
ext4_do_update_inode

其内部实现过程中会先上文件内全局的自旋锁spin_lock(),在设置完新的block并更新inode元数据后调用spin_unlock()解锁,之后处理脏元数据,这个过程需要记录journal日志。

enter image description here

对于一个空文件进行持续的写入,每当 ext4_map_blocks() 获取block失败,就会执行复杂的流程来创建新的逻辑空间到物理空间的block映射,这种开销对于性能的影响是非常致命的,对于分秒必争的比赛更是如此。

为了避免这段开销,我们可以在写入空白文件之前预先写入足够多的数据,让inode预热一下,之后再从position 0开始写入。这种方法称为 预分配 ,Linux中提供 fallocate 命令完成这种操作,在Java中可以手动完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void fallocate(FileChannel channel, long allocateSize) throws IOException {
if (channel.size() == 0){
int batch = (int) (Const.K * 4);
int size = (int) (allocateSize / batch);
ByteBuffer buffer = ByteBuffer.allocateDirect(batch);
for (int i = 0; i < batch; i ++){
buffer.put((byte) 0);
}
for (int i = 0; i < size; i ++){
buffer.flip();
channel.write(buffer);
}
channel.force(true);
channel.position(0);
Utils.recycleByteBuffer(buffer);
}
}

当然,预分配不是适用于所有场景,本次赛题的计时从第一次append开始,所以有足够的时间在程序初始化过程中完成预分配。再者就是SSD硬盘空间的容量最好足够大,如果容量与要写入的数据相当,预分配后再进行写入时,会导致SSD内部频繁的Foreground GC,性能下降。

¶4K对齐

传统HDD扇区单位一直习惯于512Byte,有些文件系统默认保留前63个扇区,也就是前512 * 63 / 1024 = 31.5KB,假设闪存Page和簇(OS读写基本单位)都大小为4KB,那么一个Page对应着8个扇区,用户数据将于第8个Page的第3.5KB位置开始写入,导致之后的每一个簇都会跨两个Page,读写处于超界处,这对于闪存会造成更多的读损及读写开销。

除了OS层的4K对齐至关重要以外,在文件写入过程中仍然需要关注4K对齐的问题。假设Page大小仍然为4KB,向一个空白文件写入5KB数据,此时需要2个Page来存储数据,Page 1写满了4KB,而Page2只写入1KB,当再次向文件顺序写入数据时,需要将Page2数据预先读出来,然后与新写入数据在内存中合并后再写入新的Page 3中,之前的Page 2则标记为 stale 等待被GC。这种带来的开销被称为写入放大WA(Write Amplification)。

enter image description here

为了减小WA,我们可以人工补充缺少的数据。对于本次赛题,当写入缓冲刷盘前,将写入Buffer的position右移至最近的4KB整数倍点位即可。

¶预读取

二阶段中,我们需要做的是从queue中获取请求区间所有的 Data ,并根据 Data 中的索引信息将真实数据从对应介质中读取出来,而且这个过程通常是批量的,具体数量由入参 fetchNum 控制。

最开始我使用 Semaphore 对批量数据多线程并发读,并且得到了不错的效果。但是背后却埋着不小的坑,由于每次getRange要频繁的对多个线程阻塞和取消阻塞,线程上下文切换带来开销非常严重,有兴趣的读者可以运行以下测试代码(并把 我不能接受打在弹幕里):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
public class Test {
public static void main(String[] args) throws InterruptedException {
int count = 100 * 10000;
int batch = 1;
ThreadPoolExecutor pools =
(ThreadPoolExecutor) Executors.newFixedThreadPool(batch);
Semaphore semaphore = new Semaphore(0);
long start = System.currentTimeMillis();
for (int i = 0; i < count; i ++){
for (int j = 0; j < batch; j ++){
pools.execute(semaphore::release);
}
semaphore.acquire(batch);
}
long end = System.currentTimeMillis();
System.out.println(end - start);
}
}

我不能接受,但是又要保证getRange阶段尽可能并发读取,于是乎我将思路转向了预读取,方法与Page Cache预读类似,举个栗子:当getRange读第0 ~ 10条数据的时候,从线程池中取个线程预读取第10 ~ 20条数据,并将这些数据存储在缓存块中,实际测试中,足够多的PMem缓存块使我们不用担心缓存池匮乏的问题。

¶顺带一提

  • 评测阶段线程数量不固定,好在所有线程几乎同时执行,所以在写入时阻塞一段时间获取到线程数量,之后再对其进行分组。
  • 每个线程要持续运行,所以将线程内数据存入ThreadLocal中,并尽可能复用。
  • 数据格式中的offset或许可以拿掉,每条记录可以省去4 Byte的空间。
  • 两个方法的入参中,Topic的类型为String,但是格式固定为TopicN,可以搞个超大switch方法将其转为int类型,方便之后的存储与读取。

¶结束

不知不觉,比赛已经结束,写这篇文章的时候明天就要上交的PPT还未开工,这次比赛收获很多,遗憾也不少,收获了很多卷友,遗憾自己未能如心。

从第一个方案出分的惊喜若狂到优化过程中的绞尽脑汁,每一秒的进步都带来了无与伦比的成就感。从为了给女朋友买个电瓶车代步的决心下定开始,仿佛就以注定要在这条道路上一卷无前。

来年,希望张三再发一次橙图(也不一定是橙色),到时候如果我心有余力,肯定很快点进来,然后一键三连。

仓库地址:https://github.com/ainilili/tianchi-race-2021

¶参考

  • Ext4
  • 持久内存架构与工程实践
  • 深入浅出SSD - 固态存储核心技术原理与实战

Round 1C 2021 - Code Jam 2021 - Closest Pick

发表于 2021-05-01 | 分类于 Algorithm

原题地址: 传送门

¶题解

从题目中可以得知,要想赢,必须满足以下规则:

  • 1.不能与已选号码重复
  • 2.所选号码周围的其他号码尽可能未被选中

第一个规则很好理解,第二个规则这里解释一下,也是思路的核心。如果要想赢,需要离中奖号码最近,要想使胜率最大化,那么就要尽可能保证所选号码离所有其余未选号码的距离尽可能大于它们到已选号码,故而,当所选号码周围未选号码最多时,就是最大的胜率。

所以,这题的本质为寻找已选号码有序情况下的前两个最大的间隔,再计算分别计算两个间隔中能保证赢的号码数,再除以号码可选范围即为胜率,分情况考虑,记排序后的已选号码集合为arr,号码可选范围为k,已选号码总数为n:

  • 开头间隔1 ~ arr[0]和结尾间隔k - arr[n - 1]可赢号码数为当前间隔长度
  • 两个已选号码之间的间隔如果为i,此时又要分两种情况考虑:
    • 如果同一个间隔中同时选择两个号码,那么可赢号码数量为当前间隔长度
    • 如果同一个间隔中只选择一个号码,那么可盈号码数量为interval / 2 + interval % 2

¶Golang解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func solve() float64{
inputs := readInts()
n, k, arr := inputs[0], inputs[1], readInts()
sort.Ints(arr)
max1, max2 := arr[0] - 1, k - arr[n - 1]
if max2 > max1 {
max2, max1 = max1, max2
}
maxInterval := 0
for i := 1; i < n; i ++ {
interval := arr[i] - arr[i - 1] - 1
if interval > 0{
if interval > maxInterval {
maxInterval = interval
}
interval = interval / 2 + interval % 2
if interval > max1 {
max2, max1 = max1, interval
}else if interval > max2{
max2 = interval
}
}
}
return float64(max(max1 + max2, maxInterval)) / float64(k)
}

Round 1A 2021 - Code Jam 2021 - Append Sort

发表于 2021-04-14 | 分类于 Algorithm

原题地址: 传送门

¶分析

题目要求是对于输入的X1,X2,…,XN,求至少在每个数字后方增加多少个的0-9来保证X1,X2,…,XN的递增顺序。

举个例子,输入集为98,9,8,很明显当前顺序不是递增的,如果要保证顺序递增,需要在保证数组第二个元素大于第一个元素,第三个元素大于第二个元素。在不考虑题目中添加位数数量限制的情况下,98,900,8000这种通过保证每个元素之间总位数差为1的方案也是蛮不错的,但是如果要求增加尽可能少的位数来达到题目要求,最佳方案为98,99,900,也就是说我们要做到尽可能保证后一个元素的值尽可能接近前一个元素的值。

再举个例子,输入集为1,1,1,1,1,1,1,1,1,1,1,1,1,1,那么补全后的结果应该为1 10 11 12 13 14 15 16 17 18 19 100 101 102。

另外需要注意,此题有两个测试集,Test1还好,每次输入集的X数量以及范围都比较小,用int完全可以搞定,但是对于Test2来说,int不一定能够满足,当X的最大值为10^9,N为100时,在极端情况下(前一个元素总是比后一个元素大),最终补全后的结果的位数最大为10 + 100 - 1 = 109位,此时用int将会溢出导致结果错误,可以通过使用bigint或者自己将大数按位拆位数组来避免溢出的问题!

¶Solve

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main

import (
"bufio"
"fmt"
"math"
"os"
"strconv"
"strings"
)

var res = make([]int, 0)
var in = bufio.NewReader(os.Stdin)

func solve(){
inputs := readInts(in)
count := 0
nums := readInts(in)
preMark := make([]int, 110)
nextMark := make([]int, 110)
for i := 0; i < inputs[0]; i ++{
if i == 0{
markFlush(preMark, nums[i])
continue
}
preL := markLen(preMark)
markFlush(nextMark, nums[i])
nextL := markLen(nextMark)

if preL > nextL {
diff := preL - nextL
for j := 0; j < nextL; j ++ {
pv := preMark[j]
nv := nextMark[j]
if nv > pv {
for k := 0; k < diff; k ++{
nextMark[nextL + k] = 0
}
count += diff
break
}else if nv < pv {
for k := 0; k < diff + 1; k ++{
nextMark[nextL + k] = 0
}
count += diff + 1
break
}else if j == nextL - 1{
for k := 0; k < diff; k ++{
nextMark[nextL + k] = preMark[nextL + k]
}
count += diff
for k := preL - 1; k > nextL - 1; k --{
nextMark[k] ++
if nextMark[k] < 10{
break
}
nextMark[k] = 0
if k == nextL {
nextMark[preL] = 0
count ++
}
}
break
}
}
}else if preL == nextL{
for j := 0; j < preL; j ++ {
pv := preMark[j]
nv := nextMark[j]
if nv > pv {
break
}
if nv < pv || (nv == pv && j == preL - 1){
count ++
nextMark[nextL] = 0
break
}
}
}
copy(preMark, nextMark)
}
res = append(res, count)
}

func markLen(mark []int) int{
count := 0
for ; count < len(mark); count++ {
if mark[count] == -1 {
break
}
}
return count
}

func markFlush(mark []int, num int) {
l := numLen(num)
for i := 0; i < len(mark); i ++ {
if i < l {
mark[i] = num / pow(10, l - i - 1) % 10
}else{
mark[i] = -1
}
}
}

func pow(a, b int) int{
return int(math.Pow(float64(a), float64(b)))
}

func numLen(num int) int{
count := 0
for{
if num <= 0 {
break
}
num = num / 10
count ++
}
return count
}

func main() {
t := readInts(in)[0]
for i := 0; i < t; i ++ {
solve()
}
for i, v := range res {
fmt.Printf("Case #%d: %d\n", i + 1, v)
}
}

func readline(in *bufio.Reader) string{
line, _ := in.ReadString('\n')
return line[0:len(line) - 1]
}

func readInts(in *bufio.Reader) []int{
line := readline(in)
arr := strings.Split(line, " ")
res := make([]int, 0)
for _, str := range arr {
v, _ := strconv.ParseInt(str, 10, 64)
res = append(res, int(v))
}
return res
}

Kick Start Round A 2021 - Rabbit House

发表于 2021-03-22 | 分类于 Algorithm

原题地址: 传送门

¶分析

对于英文弱鸡的笔者,这道题着实看了很久才看懂题意,同样在一个二维平面内存在着R * C的格子,且每个格子都会有0≤ Gi,j ≤2*10^6各盒子,例如:

1
2
3
0 0 0
0 2 0
0 0 0

上述表示R为3,C为3的格子,中间格子的盒子数量为2,其余格子盒子数量为0。

描述到此为止,最终问题是求输入的R*C的格子中,如果要保证每个格子与相邻格子的盒子数量差在1以内,需要在当前规格的基础上,至少增加多少个盒子?

因为最终要保证整个平面内,所有的格子高度差绝对值 <= 1,所以应该从最高的格子开始,让其相邻格子条件,之后除去当前格子,再取剩下格子中最高的格子循环处理,直到当前最高的格子高度为1时结束循环,将新增的盒子数作为答案输出。

分析后的问题转化为将格子从高到低排列,并优先取出最高的格子做处理,并且格子的高度排列是会随着处理过程发生变化,所以使用优先队列处理之!

¶Solve

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package main

import (
"bufio"
"container/heap"
"fmt"
"os"
"strconv"
"strings"
)

var res = make([]int, 0)

var in = bufio.NewReader(os.Stdin)

func solve(){
inputs := readInts(in)
r := inputs[0]
c := inputs[1]
grid := make([][]int, r)
for i := 0; i < r; i ++ {
grid[i] = readInts(in)
}

nodes := &nodes{}
for i := 0; i < r; i ++ {
for j := 0; j < c; j++ {
heap.Push(nodes, node{
val: grid[i][j],
x: j,
y: i,
})
}
}
m := 0
for ;;{
if nodes.Len() == 0{
break
}
n := heap.Pop(nodes).(node)
if n.val != grid[n.y][n.x] {
continue
}
if n.x > 0 {
if n.val - grid[n.y][n.x - 1] > 1{
m += n.val - grid[n.y][n.x - 1] - 1
grid[n.y][n.x - 1] = n.val - 1
heap.Push(nodes, node{
val: grid[n.y][n.x - 1],
x: n.x - 1,
y: n.y,
})
}
}
if n.x < c -1 {
if n.val - grid[n.y][n.x + 1] > 1{
m += n.val - grid[n.y][n.x + 1] - 1
grid[n.y][n.x + 1] = n.val - 1
heap.Push(nodes, node{
val: grid[n.y][n.x + 1],
x: n.x + 1,
y: n.y,
})
}
}
if n.y > 0 {
if n.val - grid[n.y - 1][n.x] > 1{
m += n.val - grid[n.y - 1][n.x] - 1
grid[n.y - 1][n.x] = n.val - 1
heap.Push(nodes, node{
val: grid[n.y - 1][n.x],
x: n.x,
y: n.y - 1,
})
}
}
if n.y < r - 1 {
if n.val - grid[n.y + 1][n.x] > 1{
m += n.val - grid[n.y + 1][n.x] - 1
grid[n.y + 1][n.x] = n.val - 1
heap.Push(nodes, node{
val: grid[n.y + 1][n.x],
x: n.x,
y: n.y + 1,
})
}
}
}
res = append(res, m)
}

func main() {
t := readInts(in)[0]
for i := 0; i < t; i ++ {
solve()
}
for i, v := range res {
fmt.Printf("Case #%d: %d\n", i + 1, v)
}
}

func readline(in *bufio.Reader) string{
line, _ := in.ReadString('\n')
return line[0:len(line) - 1]
}

func readInts(in *bufio.Reader) []int{
line := readline(in)
arr := strings.Split(line, " ")
res := make([]int, 0)
for _, str := range arr {
v, _ := strconv.ParseInt(str, 10, 64)
res = append(res, int(v))
}
return res
}

type node struct {
val int
x int
y int
}

type nodes []node

func (t *nodes) Len() int {
return len(*t) //
}

func (t *nodes) Less(i, j int) bool {
return (*t)[i].val >= (*t)[j].val
}

func (t *nodes) Swap(i, j int) {
(*t)[i], (*t)[j] = (*t)[j], (*t)[i]
}

func (t *nodes) Push(x interface{}) {
*t = append(*t, x.(node))
}

func (t *nodes) Pop() interface{} {
n := len(*t)
if n > 0 {
x := (*t)[n-1]
*t = (*t)[:n-1]
return x
}
return -1
}

func (t *nodes) Peek() interface{} {
n := len(*t)
if n > 0 {
return (*t)[0]
}
return -1
}

Kick Start Round A 2021 - L Shaped Plots

发表于 2021-03-22 | 分类于 Algorithm

原题地址: 传送门

¶分析

在二维坐标系中由很多坐标点组成段,段的最小长度为2,且段只有x轴(横轴)和y轴(纵轴)之分,求由两个段组成,垂直有共同顶点,且长边是短边两倍的组合数量。

输入集是一个二维数组,要求中的两个线段互相垂直,这里可以分析出组合中的两个线段一个在x轴一个在y轴。一种思路是通过遍历二维数组得到x和y轴所有的段,再对每个段的端点做索引,再做dfs求组合数,不过这种简单的思路在Test2会出现超时或内存溢出的错误,所以要换一种更简单的思路。

个人解决思路是先对x或y任意维度做head的索引(y轴由上到下,head为上顶点,x轴head为左顶点),例如先对y轴所有段的head做索引,这里直接使用二维数组,假设y轴段的索引定义为二维数组indexs = [1000][1000]int, indexs第一层下标为x轴坐标,第二层下标为y轴坐标,值为段的长度。之后再遍历获取所有的x轴段,对于x轴的每个段,取其对应的head和tail坐标以及长度len,通过枚举所有满足的y轴坐标值:

  • y1 (head.x, head.y - (len/2) + 1)
  • y2 (head.x, head.y)
  • y3 (head.x, head.y - (len*2) + 1)
  • y4 (tail.x, tail.y - (len/2) + 1)
  • y5 (tail.x, tail.y)
  • y6 (tail.x, tail.y - (len*2) + 1)

(y1和y4只有当len为偶数且大于等于4的时候才成立)
之后通过得到的y轴坐标去检索indexs,如果存在则判断对应段的长度是否满足与当前x轴段长的规则,如果满足则count ++,所有x轴段匹配完毕,输出count为结果值。

¶Slove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main

import (
"bufio"
"fmt"
"os"
"strconv"
"strings"
)

var res = make([]int, 0)

var in = bufio.NewReader(os.Stdin)



func solve(){
inputs := readInts(in)
r := inputs[0]
c := inputs[1]
grid := make([][]int, r)
for i := 0; i < r; i ++ {
grid[i] = readInts(in)
}
cols := [1000][1000]int{}
for i := 0; i < c; i ++ {
y := -1
for j := 0; j < r; j ++ {
if grid[j][i] == 1{
if y == -1 {
y = j
}
}
if y != -1 && (grid[j][i] != 1 || j == r - 1){
d := j - y
if j == r - 1 && grid[j][i] == 1{
d ++
}
if d > 1 {
for di := 0; di < d; di ++ {
for dj := di + 1; dj < d; dj ++{
cols[i][y + di] = dj - di + 1
}
}
}
y = -1
}
}
}
count := 0
for i := 0; i < r; i ++ {
x := -1
for j := 0; j < c; j ++ {
if grid[i][j] == 1{
if x == -1 {
x = j
}
}
if x != -1 &&(grid[i][j] != 1 || j == c - 1){
d := j - x
if j == c - 1 && grid[i][j] == 1{
d ++
}
if d > 1 {
for di := 0; di < d; di ++ {
for dj := di + 1; dj < d; dj ++{
hx := x + di
hy := i
tx := x + dj
ty := i
l := dj - di + 1

if l % 2 == 0 && l >= 4{
chy := hy - l / 2 + 1
cty := ty - l / 2 + 1
if chy >= 0 {
if n := cols[hx][chy]; n >= l / 2{
count ++
}
}
if n := cols[hx][hy]; n >= l / 2 {
count ++
}
if cty >= 0 {
if n := cols[tx][cty]; n >= l / 2{
count ++
}
}
if n := cols[tx][ty]; n >= l / 2 {
count ++
}
}
chy := hy - l * 2 + 1
cty := ty - l * 2 + 1
if chy >= 0 {
if n := cols[hx][chy]; n >= l * 2 {
count ++
}
}
if n := cols[hx][hy]; n >= l * 2 {
count ++
}
if cty >= 0 {
if n := cols[tx][cty]; n >= l * 2 {
count ++
}
}
if n := cols[tx][ty]; n >= l * 2 {
count ++
}
}
}
}
x = -1
}
}
}

res = append(res, count)
}

func main() {
t := readInts(in)[0]
for i := 0; i < t; i ++ {
solve()
}
for i, v := range res {
fmt.Printf("Case #%d: %d\n", i + 1, v)
}
}

func readline(in *bufio.Reader) string{
line, _ := in.ReadString('\n')
return line[0:len(line) - 1]
}

func readInts(in *bufio.Reader) []int{
line := readline(in)
arr := strings.Split(line, " ")
res := make([]int, 0)
for _, str := range arr {
v, _ := strconv.ParseInt(str, 10, 64)
res = append(res, int(v))
}
return res
}

Kick Start Round A 2021 - K-Goodness String

发表于 2021-03-22 | 分类于 Algorithm

原题地址: 传送门

¶分析

题意大概是当字符串对称位置的字母如果不相同则score加一分,输入字符串以及期望的score分数,求将当前字符串变动任意位置的字母使之score等于期望的socre。

其实就是求字符串对称位置字符不相同的数量,然后和期望score做差再取abs。

¶Slove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import (
"bufio"
"fmt"
"os"
"strconv"
"strings"
)

var res = make([]int, 0)
var in = bufio.NewReader(os.Stdin)

func solve(){
inputs := readInts(in)
n := inputs[0]
k := inputs[1]
str := readline(in)
s := 0

for i := 0; i < len(str); i++ {
r := n - 1 - i
if r <= i {
break
}
if str[i] != str[r] {
s ++
}
}
result := k - s
if result < 0{
result = - result
}
res = append(res, result)
}

func main() {
t := readInts(in)[0]
for i := 0; i < t; i ++ {
solve()
}
for i, v := range res {
fmt.Printf("Case #%d: %d\n", i + 1, v)
}
}

func readline(in *bufio.Reader) string{
line, _ := in.ReadString('\n')
return line[0:len(line) - 1]
}

func readInts(in *bufio.Reader) []int{
line := readline(in)
arr := strings.Split(line, " ")
res := make([]int, 0)
for _, str := range arr {
v, _ := strconv.ParseInt(str, 10, 64)
res = append(res, int(v))
}
return res
}

如何设计并实现一个db连接池?

发表于 2019-05-26

¶连接池的使命!

无论是线程池还是db连接池,他们都有一个共同的特性:资源复用,在普通的场景中,我们使用一个连接,它的生命周期可能是这样的:

一个连接,从创建完毕到销毁,期间只被使用了一次(这里的一次是指在单个作用域内的使用),当周期结束,另外一个调用者仍然需要这个连接去做事,就要重复去经历这种生命周期。因为创建和销毁都是需要对应的服务消耗时间以及系统资源去处理的,这样不仅浪费了大量的系统资源,而且导致业务响应过程中都要花费部分时间去重复的创建和销毁,得不偿失,而连接池便被赋予了解决这种问题的使命!

¶连接池需要做什么?

顾名思义,连接池中的池字已经很生动形象的阐明了它的用意,它用将所有连接放入一个"池子"中统一的去控制连接的创建和销毁,和原始生命周期去对比,连接池多了以下特性:

  • 创建并不是真的创建,而是从池子中选出空闲连接。
  • 销毁并不是真的销毁,而是将使用中的连接放回池中(逻辑关闭)。
  • 真正的创建和销毁由线程池的特性机制来决定。

因此,当使用连接池后,我们使用一个连接的生命周期将会演变成这样:

¶分析计划

通灵之术 - 传送门:https://github.com/ainilili/honeycomb,DEMO为Java语言实现!

事前,我们需要点支烟分析一下时间一个连接池需要做哪些事情:

  • 保存连接的容器是必不可少的,另外,该容器也要支持连接的添加和移除功能,并保证线程安全。
  • 我们需要因为要对连接的销毁做逻辑调整,我们需要重写它的close以及isClosed方法。
  • 我们需要有个入口对连接池做管理,例如回收空闲连接。

连接池不仅仅只是对Connection生命周期的控制,还应该加入一些特色,例如初始连接数,最大连接数,最小连接数、最大空闲时长以及获取连接的等待时长,这些我们也简单支持一下。

目标以明确,开始动工。

¶连接池容器选型

要保证线程安全,我们可以将目标瞄准在JUC包下的神通们,设我们想要的容器为x,那么x不仅需要满足基本的增删改查功能,而且也要提供获取超时功能,这是为了保证当池内长时间没有空闲连接时不会导致业务阻塞,即刻熔断。另外,x需要满足双向操作,这是为了连接池可以识别出饱和的空闲连接,方便回收操作。

综上所述,LinkedBlockingDeque是最合适的选择,它使用InterruptibleReentrantLock来保证线程安全,使用Condition来做获取元素的阻塞,另外支持双向操作。

另外,我们可以将连接池拆分为3个类型:

  • 工作池:存放正在被使用的连接。
  • 空闲池:存放空闲连接。
  • 回收池:已经被回收(物理关闭)的连接。

其中,工作池和回收池大可不必用双向对列,或许用单向队列或者Set都可以代替之:

1
2
3
private LinkedBlockingQueue<HoneycombConnection> workQueue;
private LinkedBlockingDeque<HoneycombConnection> idleQueue;
private LinkedBlockingQueue<HoneycombConnection> freezeQueue;

¶Connection的装饰

连接池的输出是Connection,它代表着一个db连接,上游服务使用它做完操作后,会直接调用它的close方法来释放连接,而我们必须做的是在调用者无感知的情况下改变它的关闭逻辑,当调用close的方法时,我们将它放回空闲队列中,保证其的可复用性!

因此,我们需要对原来的Connection做装饰,其做法很简单,但是很累,这里新建一个类来实现Connection接口,通过重写所有的方法来实现一个**“可编辑”**的Connection,我们称之为Connection的装饰者:

1
2
3
4
5
6
7
8
9
10
public class HoneycombConnectionDecorator implements Connection{

protected Connection connection;

protected HoneycombConnectionDecorator(Connection connection) {
this.connection = connection;
}

此处省略对方法实现的三百行代码...
}

之后,我们需要新建一个自己的Connection来继承这个装饰者,并重写相应的方法:

1
2
3
4
5
6
7
8
9
public class HoneycombConnection extends HoneycombConnectionDecorator implements HoneycombConnectionSwitcher{
@Override
public void close() { do some things }

@Override
public boolean isClosed() throws SQLException { do some things }

省略...
}

¶DataSource的重写

DataSource是JDK为了更好的统合和管理数据源而定义出的一个规范,获取连接的入口,方便我们在这一层更好的扩展数据源(例如增加特殊属性),使我们的连接池的功能更加丰富,我们需要实现一个自己的DataSource能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class HoneycombWrapperDatasource implements DataSource{
protected HoneycombDatasourceConfig config;
省略其它方法的实现...
@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection(config.getUrl(), config.getUser(), config.getPassword());
}

@Override
public Connection getConnection(String username, String password) throws SQLException {
return DriverManager.getConnection(config.getUrl(), username, password);
}
省略其它方法的实现...
}

我们完成了对数据源的实现,但是这里获取连接的方式是物理创建,我们需要满足池化的目的,需要重写HoneycombWrapperDatasource中的连接获取逻辑,做法是创建一个新的类对父类方法重写:

1
2
3
4
5
6
7
8
public class HoneycombDataSource extends HoneycombWrapperDatasource{
private HoneycombConnectionPool pool;
@Override
public Connection getConnection() throws SQLException {
这里实现从pool中取出连接的逻辑
}
省略...
}

¶特性扩展

在当前结构体系下,我们的连接池逐渐浮现出了雏形,但远远不够的是,我们需要在此结构下可以做自由的扩展,使连接池对连接的控制更加灵活,因此我们可以引入特性这个概念,它允许我们在其内部访问连接池,并对连接池做一系列的扩展操作:

1
2
3
public abstract class AbstractFeature{
public abstract void doing(HoneycombConnectionPool pool);
}

AbstractFeature抽象父类需要实现doing方法,我们可以在方法内部实现对连接池的控制,其中一个典型的例子就是对池中空闲连接左回收:

1
2
3
4
5
6
public class CleanerFeature extends AbstractFeature{
@Override
public void doing(HoneycombConnectionPool pool) {
这里做空闲连接的回收
}
}

¶落实计划

经过上述分析,要完成一个连接池,需要这些模块的配合,总体流程如下:

¶第一步:设置数据源属性

在初始化DataSource之前,我们需要将各属性设置进去,这里使用HoneycombWrapperDatasource中的HoneycombDatasourceConfig来承载各属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class HoneycombDatasourceConfig {

//db url
private String url;

//db user
private String user;

//db password
private String password;

//driver驱动
private String driver;

//初始化连接数,默认为2
private int initialPoolSize = 2;

//最大连接数,默认为10
private int maxPoolSize = 10;

//最小连接数,默认为2
private int minPoolSize = 2;

//获取连接时,最大等待时长,默认为60s
private long maxWaitTime = 60 * 1000;

//最大空闲时长,超出要被回收,默认为20s
private long maxIdleTime = 20 * 1000;

//特性列表
private List<AbstractFeature> features;

public HoneycombDatasourceConfig() {
features = new ArrayList<AbstractFeature>(5);
}

省略getter、setter....

¶第二步:初始化连接池

设置好属性之后,我们需要完成连接池的初始化工作,在HoneycombDataSource的init方法中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private void init() throws ClassNotFoundException, SQLException {
//阻塞其他线程初始化操作,等待初始化完成
if(initialStarted || ! (initialStarted = ! initialStarted)) {
if(! initialFinished) {
try {
INITIAL_LOCK.lock();
INITIAL_CONDITION.await();
} catch (InterruptedException e) {
} finally {
INITIAL_LOCK.unlock();
}
}
return;
}

//config参数校验
config.assertSelf();

Class.forName(getDriver());

//实例化线程池
pool = new HoneycombConnectionPool(config);

//初始化最小连接
Integer index = null;
for(int i = 0; i < config.getInitialPoolSize(); i ++) {
if((index = pool.applyIndex()) != null) {
pool.putLeisureConnection(createNativeConnection(pool), index);
}
}

//触发特性
pool.touchFeatures();

//完成初始化并唤醒其他阻塞
initialFinished = true;
try {
INITIAL_LOCK.lock();
INITIAL_CONDITION.signalAll();
}catch(Exception e) {
}finally {
INITIAL_LOCK.unlock();
}
}

¶第三步:创建初始连接

在init的方法中,如果initialPoolSize大于0,会去创建指定数量的物理连接放入连接池中,创建数量要小于最大连接数maxPoolSize:

1
2
3
public HoneycombConnection createNativeConnection(HoneycombConnectionPool pool) throws SQLException {
return new HoneycombConnection(super.getConnection(), pool);
}

完成初始化后,下一步就是获取连接。

¶第四步:从空闲池获取

我们之前将连接池分成了三个,它们分别是空闲池、工作池和回收池。

我们可以通过HoneycombDataSource的getConnection方法来获取连接,当我们需要获取时,首先考虑的是空闲池是否有空闲连接,这样可以避免创建和激活新的连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public Connection getConnection() throws SQLException {
try {
//初始化连接池
init();
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}

HoneycombConnection cn = null;
Integer index = null;

if(pool.assignable()) {
//空闲池可分配,从空闲池取出
cn = pool.getIdleConnection();
}else if(pool.actionable()) {
//回收池可分配,从回收池取出
cn = pool.getFreezeConnection();
}else if((index = pool.applyIndex()) != null) {
//如果连接数未满,创建新的物理连接
cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
}

if(cn == null) {
//如果无法获取连接,阻塞等待空闲池连接
cn = pool.getIdleConnection();
}

if(cn.isClosedActive()) {
//如果物理连接关闭,则获取新的连接
cn.setConnection(super.getConnection());
}
return cn;
}

¶第五步:从回收池获取

如果空闲池不可分配,那么说明连接供不应求,也许之前有些空闲连接已经被回收(物理关闭),那么我们在创建新连接之前,可以到回收池看一下是否存在已回收连接,如果存在直接取出:

1
2
3
4
else if(pool.actionable()) {
//回收池可分配,从回收池取出
cn = pool.getFreezeConnection();
}

¶第六步:创建新的连接

如果回收池也不可分配,此时要判断连接池连接数量是否已经达到最大连接,如果没有达到,创建新的物理连接并直接添加到工作池中:

1
2
3
4
else if((index =  pool.applyIndex()) != null) {
//如果连接数未满,创建新的物理连接,添加到工作池
cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
}

¶第七步:等待空闲池的连接

如果上述三种情况都不满足,那么只能从空闲池等待其他连接的释放:

1
2
3
4
if(cn == null) {
//如果无法获取连接,阻塞等待空闲池连接
cn = pool.getIdleConnection();
}

具体逻辑封装在HoneycombConnectionPool的getIdleConnection方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public HoneycombConnection getIdleConnection() {
try {
//获取最大等待时间
long waitTime = config.getMaxWaitTime();
while(waitTime > 0) {
long beginPollNanoTime = System.nanoTime();

//设置超时时间,阻塞等待其他连接的释放
HoneycombConnection nc = idleQueue.poll(waitTime, TimeUnit.MILLISECONDS);
if(nc != null) {
//状态转换
if(nc.isClosed() && nc.switchOccupied() && working(nc)) {
return nc;
}
}
long timeConsuming = (System.nanoTime() - beginPollNanoTime) / (1000 * 1000);

//也许在超时时间内获取到了连接,但是状态转换失败,此时刷新超时时间
waitTime -= timeConsuming;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
}
throw new RuntimeException("获取连接超时");
}

¶第八步:激活连接

最后,判断一下连接是否被物理关闭,如果是,我们需要打开新的连接替换已经被回收的连接:

1
2
3
4
if(cn.isClosedActive()) {
//如果物理连接关闭,则获取新的连接
cn.setConnection(super.getConnection());
}

¶连接的回收

如果在某段时间内我们的业务量剧增,那么需要同时工作的连接将会很多,之后过了不久,我们的业务量下降,那么之前已经创建的连接明显饱和,这时就需要我们对其进行回收,我们可以通过AbstractFeature入口操作连接池。

对于回收这个操作,我们通过CleanerFeature来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class CleanerFeature extends AbstractFeature{

private Logger logger = LoggerFactory.getLogger(CleanerFeature.class);

public CleanerFeature(boolean enable, long interval) {
//enable表示是否启用
//interval表示扫描间隔
super(enable, interval);
}

@Override
public void doing(HoneycombConnectionPool pool) {
LinkedBlockingDeque<HoneycombConnection> idleQueue = pool.getIdleQueue();
Thread t = new Thread() {
@Override
public void run() {
while(true) {
try {
//回收扫描间隔
Thread.sleep(interval);

//回收时,空闲池上锁
synchronized (idleQueue) {
logger.debug("Cleaner Model To Start {}", idleQueue.size());
//回收操作
idleQueue.stream().filter(c -> { return c.idleTime() > pool.getConfig().getMaxIdleTime(); }).forEach(c -> {
try {
if(! c.isClosedActive() && c.idle()) {
c.closeActive();
pool.freeze(c);
}
} catch (SQLException e) {
e.printStackTrace();
}
});
logger.debug("Cleaner Model To Finished {}", idleQueue.size());
}
}catch(Throwable e) {
logger.error("Cleaner happended error", e);
}
}
}
};
t.setDaemon(true);
t.start();
}
}

这里的操作很简单,对空闲池加锁,扫描所有连接,释放空闲时间超过最大空闲时间设置的连接,其实这里只要知道当前连接的空闲时长就一目了然了,我们在连接放入空闲池时候去刷新他的空闲时间点,那么当前的空闲时长就等于当前时间减去空闲开始时间:

1
idleTime = nowTime - idleStartTime

在切换状态为空闲时刷新空闲开始时间:

1
2
3
4
 @Override
public boolean switchIdle() {
return unsafe.compareAndSwapObject(this, statusOffset, status, ConnectionStatus.IDLE) && flushIdleStartTime();
}

¶测试一下

体验成果的最快途径就是投入使用,这里搞一个单元测试体验一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static ThreadPoolExecutor tpe = new ThreadPoolExecutor(1000, 1000, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

@Test
public void testConcurrence() throws SQLException, InterruptedException{
long start = System.currentTimeMillis();
HoneycombDataSource dataSource = new HoneycombDataSource();
dataSource.setUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&transformedBitIsBoolean=true&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=Asia/Shanghai");
dataSource.setUser("root");
dataSource.setPassword("root");
dataSource.setDriver("com.mysql.cj.jdbc.Driver");
dataSource.setMaxPoolSize(50);
dataSource.setInitialPoolSize(10);
dataSource.setMinPoolSize(10);
dataSource.setMaxWaitTime(60 * 1000);
dataSource.setMaxIdleTime(10 * 1000);
dataSource.addFeature(new CleanerFeature(true, 5 * 1000));

test(dataSource, 10000);
System.out.println(System.currentTimeMillis() - start + " ms");
}

public static void test(DataSource dataSource, int count) throws SQLException, InterruptedException {
CountDownLatch cdl = new CountDownLatch(count);
for(int i = 0; i < count; i ++) {
tpe.execute(() -> {
try {
HoneycombConnection connection = (HoneycombConnection) dataSource.getConnection();
Statement s = connection.createStatement();
s.executeQuery("select * from test limit 1");
connection.close();
}catch(Exception e) {
}finally {
cdl.countDown();
}
});
}
cdl.await();
tpe.shutdown();
}

PC配置:Intel® Core™ i5-8300H CPU @ 2.30GHz 2.30 GHz 4核8G 512SSD

10000次查询,耗时:

1
938 ms

结束语:再次召唤传送门:https://github.com/ainilili/honeycomb

深入JAVA8的HashMap实现原理

发表于 2018-12-25 | 分类于 数据结构

¶引导

在了解HashMap之前,我们应该先明白两个概念:Hash和Map,这可以帮助我们更容易了解HashMap的运行原理。

那么何为Hash,又何为Map呢?

¶Hash

之前写过一篇关于Hash的文章 Hash

¶Map

Map是一种K-V形式的数据结构,一个唯一的key,会唯一对应一个value。也就是说,在Map容器里不允许两个一模一样的key。

一个简单的Map结构如下:

1
2
3
4
5
{
"key1":"value1",
"key2":"value2",
"key3":"value3"
}

对于这种数据结构,并且Map会对外提供一些方法来实现对内部数据的操作:

1
2
3
4
V put(K key, V value)
V get(Object key)
V remove(Object key)
boolean containsKey(Object key)

可见Map对于我们操作K-V形式的数据非常方便,实现的方式有很多,最简单粗暴的实现方式是使用List来存储每一个K-V组对,对于每种方法的实现只需要暴力循环碰撞即可,对于少量数据这种做法未必不可,如果数据量庞大之千万,我们就要换一种更加高效,速度更快的实现方式:HashMap。

¶HashMap

Map在Java中的实现有很多,HashMap便是其中之一,在JDK漫长的版本更新中,HashMap的实现也是在不断的更新着:

  • <=JDK1.7:Table数组 + Entry链表
  • >=JDK1.8:Table数组 + Entry链表/红黑树

本文我们跳过JDK1.7的实现,来看一下1.8中HashMap源码所带来的魅力冲击!

¶实现原理

对于各个版本的HashMap实现原理,主线流程都是一成不变的:

hashmap原理流程图

这里有两个数据结构需要我们知道:

  • Table:哈希表,存放Node元素。
  • Node:结点元素,存放K-V组对信息,其结构是一个链表/红黑树。

另外,在HashMap内部有一些关键属性我们也要了解一下:

  • DEFAULT_INITIAL_CAPACITY:Table数组初始长度,默认为1 << 4,2^4 = 16。
  • MAXIMUM_CAPACITY:Table数组最高长度,默认为1 << 30,2^30 = 1073741824。
  • DEFAULT_LOAD_FACTOR:负载因子,当总元素数 > 数组长度 * 负载因子时,Table数组将会扩容,默认为0.75。
  • TREEIFY_THRESHOLD:树化阈值,当单个Table内Node数量超过该值,则会将链表转化为红黑树,默认为8。
  • UNTREEIFY_THRESHOLD:链化阈值,当扩容期间单个Table内Entry数量小于该值,则将红黑树转化为链表,默认为6。
  • MIN_TREEIFY_CAPACITY:最小树化阈值,当Table所有元素超过改值,才会进行树化(为了防止前期阶段频繁扩容和树化过程冲突)。
  • size:Table数组当前所有元素数。
  • threshold:下次扩容的阈值(数组长度 * 负载因子)

HashMap的内部有着一个Table数组,而这个数组的初始长度为DEFAULT_INITIAL_CAPACITY参数值,Table数组存放的元素类型就是Node,它是一个单向链表:

1
2
3
4
5
6
static class Node<K,V> implements Map.Entry<K,V> {
final int hash; //key的hash值
final K key; //key
V value; //value
Node<K,V> next; //下一个结点
}

每个Table中存的Node元素相当于链表的header,next指向下一个结点,而这种链式结构的存在正是为了解决hash冲突:

hash冲突:两个元素的经过Hash散列之后分在同一个组内,我们将之解释为Hash冲突

在JDK1.7之前的版本,hash冲突的解决方法是将被冲突的Node结点放于一个链表中,而Table中的元素则是链头,当然在JDK1.8中,当Table中链长超过TREEIFY_THRESHOLD阈值后,将会将链表转变为红黑树的实现TreeNode:

1
2
3
4
5
6
7
static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
}

当发生hash冲突的Node不断变多,那么这个链将会越来越长,那么遍历碰撞key时的耗时就会不断增加,这也就直接导致了性能的不足,从JDK1.8开始,HashMap对于单个Table中的Node超出某个阈值时,将会开始树化操作(链表转化为红黑树),这对于搜索的性能将会有很大的提升,而插入和删除的操作所带来的性能影响微乎其微。

¶put方法

在HashMap的内部会有一个Table数组,这个数组的当前长度就是我们要实现映射的目标范围,当我们执行put方法时,key和value要经历这些事情:

  • 通过Hash散列获取到对应的Table
  • 遍历Table下的Node结点,做更新/添加操作
  • 扩容检测

具体实现我们可以根据源码来详细了解一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
// HashMap的懒加载策略,当执行put操作时检测Table数组初始化。
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null)
//通过``Hash``函数获取到对应的Table,如果当前Table为空,则直接初始化一个新的Node并放入该Table中。
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
//输入的key命中了当前Table的首元素,直接更新。
e = p;
else if (p instanceof TreeNode)
//如果当前Node类型为TreeNode,调用``putTreeVal``方法。
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
//如果不是TreeNode,则就是链表,遍历并与输入key做命中碰撞。
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
//如果当前Table中不存在当前key,则添加。
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
//超过了``TREEIFY_THRESHOLD``则转化为红黑树。
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
//做命中碰撞,使用hash、内存和equals同时判断(不同的元素hash可能会一致)。
break;
p = e;
}
}
if (e != null) {
//如果命中不为空,更新操作。
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
//扩容检测。
resize();
afterNodeInsertion(evict);
return null;
}

对于其过程中的关于Node链表和红黑树的转换过程我们可以暂时屏蔽掉,那么整个流程并不是很绕,那么我们继续深入的来看一下HashMap的扩容实现。

¶resize方法

HashMap的扩容大致的实现是将老Table数组中所有的Entry取出来,重新对其hashcode做Hash散列到新的新的Table之中,也就是一个re-put的过程,具体还是通过源码来讲解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
final Node<K,V>[] resize() {
//保留老的hash表
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
//如果之前的容量大于0
if (oldCap > 0) {
//如果超出最大容量
if (oldCap >= MAXIMUM_CAPACITY) {
//扩容阈值为int最大值
threshold = Integer.MAX_VALUE;
return oldTab;
}
//否则计算扩容后的阈值
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0)
// 如果之前的容量等于0,并且之前的阈值大于零,则新的hash表长度就等于它
newCap = oldThr;
else {
// 初始阈值为零表示使用默认值
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
//如果新的阈值为 0 ,就得用 新容量*加载因子 重计算一次
if (newThr == 0) {

float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
//常见扩容后的hash表
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab; //A
if (oldTab != null) {
//遍历旧的hash表,将之内部元素转移到新的hash表
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
//如果当前Table内只有一个元素,重新做hash散列并赋值
newTab[e.hash & (newCap - 1)] = e; //B
else if (e instanceof TreeNode)
//如果旧哈希表中这个位置的桶是树形结构,就要把新哈希表里当前桶也变成树形结构
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { //保留旧哈希表桶中链表的顺序
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do { //遍历当前Table内的Node,赋值给新的Table
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}

¶get方法

在我们看完HashMap对于put方法的实现之后,get方法则显得简单易懂,其代码与put相近无几,主要差别是没有了扩容和添加/更新的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
//判断hash表是否为空,表重读是否大于零并且当前key对应分布的表内是否有Node存在
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
if (first.hash == hash &&
((k = first.key) == key || (key != null && key.equals(k))))
// 检测第一个Node,命中则不需要在做do...while...循环
return first;
if ((e = first.next) != null) {
if (first instanceof TreeNode)
//如果Table内是树形结构,则使用对应的检索方法
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
do { //如果是链表,则做while循环,直到命中或者遍历结束
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

¶containsKey方法

根据get方法的结果是否为空就可以直到是否包含该key:

1
2
3
public boolean containsKey(Object key) {
return getNode(hash(key), key) != null;
}

¶remove方法

同样类似于put操作,首先会查找对应的key所在位置,如果为空,则不操作,反之,将之移除:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final Node<K,V> removeNode(int hash, Object key, Object value,
boolean matchValue, boolean movable) {
Node<K,V>[] tab; Node<K,V> p; int n, index;
//判断hash表是否为空,表重读是否大于零并且当前key对应分布的表内是否有Node存在
if ((tab = table) != null && (n = tab.length) > 0 &&
(p = tab[index = (n - 1) & hash]) != null) {
Node<K,V> node = null, e; K k; V v;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
// 第一个Node命中
node = p;
else if ((e = p.next) != null) {
if (p instanceof TreeNode)
//如果Table内是树形结构,则使用对应的检索方法
node = ((TreeNode<K,V>)p).getTreeNode(hash, key);
else {
do { //如果是链表,则做while循环,直到命中或者遍历结束
if (e.hash == hash &&
((k = e.key) == key ||
(key != null && key.equals(k)))) {
node = e;
break;
}
p = e;
} while ((e = e.next) != null);
}
}
if (node != null && (!matchValue || (v = node.value) == value ||
(value != null && value.equals(v)))) {
//如果命中到了对应的Node,则根据Node结构进行对应的移除操作
if (node instanceof TreeNode)
((TreeNode<K,V>)node).removeTreeNode(this, tab, movable);
else if (node == p)
tab[index] = node.next;
else
p.next = node.next;
++modCount;
//修改hash表元素数
--size;
afterNodeRemoval(node);
return node;
}
}
return null;
}

¶为何线程不安全?

看完了HashMap的实现之后,就该谈一谈它为什么存在线程安全问题!

¶数据丢失

首先,我们将目光放在put方法的实现中,假设有两个线程在同时进行put操作,对应的数据分别为:

1
2
thread-1: put(1, 'abc');
thread-2: put(1, 'efg');

假设此时Hash表的长度为10,且已经有两个元素在,负载因子为默认值0.75f,那么操作过程一定不会扩容,并且两个线程put的key都是1,那么它们将会分配到同一个table中,下方代码为put方法中的其中一段,其主要作用是遍历当前表内Node,寻找与当前key一样的Node结点,之后再做添加/更新操作。

1
2
3
4
5
6
7
8
9
10
11
12
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null); // A
if (binCount >= TREEIFY_THRESHOLD - 1)
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}

假设两个线程同时执行到了A这个位置,此时获取到的p是统一个对象,下一刻,cpu运转,两个线程同时运行,那么p.next的值将会是最后一个线程put的value值,而前一个则会丢失,这就会导致丢数据的情况!

当然该情景同样会发生于resize和remove操作,至于为什么,大家可以思考一下!

¶size不准确

这个就很简单了,为什么不准确呢,来看一下size变量在HashMap内部的定义:

1
transient int size;

内存不可见并且增减操作未加锁,多线程操作下属于非原子操作!

¶闭环死锁

这个问题在JDK1.8版本的HashMap中已经不存在了,至于为啥,我要先讲一下在1.8之前的HashMap为什么会存在闭环死锁问题!

从闭环这个名词上我们分析一下是什么问题,什么是闭环的,如果链表形成了一个环会不会就是闭环呢?而链表如何才会形成环?带着这些问题,我们在脑海中抽象出一个模型:

1
2
3
graph LR
A-->B
B-->A

假设某一个Table中的Node链表发生了上述问题,那么我们在遍历时进行do{ }while ((e = e.next) != null);操作就会发生死锁的问题,那么看来我们的猜想方向是正确的,那么我们就具体分析一下HashMap在什么操作之中会产生闭环的问题,不过在此之前,我们要明白因果:

1
2
因:???
果:闭环

我们知道,只有当两个结点内部的next相互引用对方的时候才会死锁,这种场景只能在两个已经存在同一个链上的结点同时以相反的方向被操作next引用的时候才会发生,而在HashMap内部,符合这种场景的只有一个方法:resize,那我们就来看一下JDK1.7的resize方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}

Entry[] newTable = new Entry[newCapacity];
boolean oldAltHashing = useAltHashing;
useAltHashing |= sun.misc.VM.isBooted() &&
(newCapacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
boolean rehash = oldAltHashing ^ useAltHashing;
//fu
transfer(newTable, rehash);
table = newTable;
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

进入transfer方法中,其内部实现了扩容过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) { // A
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}

我们发现,在JDK1.7的HashMap的扩容实现中,老的Table中的Node链的顺序赋值给新的Table中时的操作是反置的:

1
2
3
e.next = newTable[i];
newTable[i] = e;
e = next;

上述操作是将当前Node的next指针指向当前Table的头结点,之后当前Node又变为了Table的头结点,此时假设A、B两个线程同时执行到了transfer方法中的A位置,并且此时的oldTable和newTable的结构是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
oldTable[]
table-1: a -> b -> c -> null
table-2: null
table-3: null

newTable[]
table-1: null
table-2: null
table-3: null
table-4: null
table-5: null
table-6: null

如果很巧,两个线程在同一个CPU上执行,那么就会存在一个抢占时间片的场景,假设A先抢到了时间片,然后执行一番操作之后,oldTable和newTable的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
oldTable[]
table-1: a -> null
table-2: null
table-3: null

newTable[]
table-1: null
table-2: c -> b -> a
table-3: null
table-4: null
table-5: null
table-6: null

之后还没等它做oldTable = newTable操作,B抢到了时间片,并也做了同样一番操作,oldTable和newTable的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
oldTable[]
table-1: a -> null
table-2: null
table-3: null

newTable[]
table-1: null
table-2: a -> c -> b -> a
table-3: null
table-4: null
table-5: null
table-6: null

此时A或者B谁先oldTable = newTable已经无所谓了,因为newTable中已经产生了闭环,之后在进行get或者put操作时,如果不小心触发到了while循环,那将会一直死循环:

1
2
3
do{
//do some thing
}while ((e = e.next) != null); //e = e.next将会永不为空

从上述场景产生的过程中我们发现,a -> c -> b -> a这种闭环问题的罪魁祸首是因为1.7中的HashMap在扩容时为了免去再次遍历链表,很聪明的将当前结点作为新链表的头结点,这就会导致顺序反转,所以无序化导致了闭环的产生,而这种问题不仅仅是在HashMap中体现,Mysql的死锁问题的原因常常也是因为反序加行锁导致的!

而在开头说过,JDK1.8已经避免了这个问题,这是为什么呢?看下代码就知道了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead; //A
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead; //B
}
}

同样是扩容的操作,JDK1.8中的HashMap通过两个链分别去存储头结点和尾结点以保证它有序,并且不会频繁的去赋值newTable,而是在循环之后直接赋值(请注意A、B标记处),这样就非常简单的避免了产生闭环的陷阱!

设计模式一:单例模式

发表于 2018-12-24 | 分类于 设计模式

¶什么是单例模式

单例是最常用的设计模式之一,其表达的最主要的意思是一个对象在整个jvm堆内存中只有一个实例,这样可以保证无论从任何代码块获取的单例实例都是唯一的。

单例的优缺点也很明显,优点有以下这些:

  • 在单例模式中,活动的单例只有一个实例,对单例类的所有实例化得到的都是相同的一个实例。这样就 防止其它对象对自己的实例化,确保所有的对象都访问一个实例
  • 单例模式具有一定的伸缩性,类自己来控制实例化进程,类就在改变实例化进程上有相应的伸缩性。
  • 提供了对唯一实例的受控访问。
  • 由于在系统内存中只存在一个对象,因此可以 节约系统资源,当 需要频繁创建和销毁的对象时单例模式无疑可以提高系统的性能。
  • 允许可变数目的实例。
  • 避免对共享资源的多重占用。

缺点:

  • 不适用于变化的对象,如果同一类型的对象总是要在不同的用例场景发生变化,单例就会引起数据的错误,不能保存彼此的状态。
  • 由于单利模式中没有抽象层,因此单例类的扩展有很大的困难。
  • 单例类的职责过重,在一定程度上违背了“单一职责原则”。
  • 滥用单例将带来一些负面问题,如为了节省资源将数据库连接池对象设计为的单例类,可能会导致共享连接池对象的程序过多而出现连接池溢出;如果实例化的对象长时间不被利用,系统会认为是垃圾而被回收,这将导致对象状态的丢失。

在很多场景单例模式是很有用的,例如配置容器,连接池等,在Java中获取编写一个单例也很简单(暂时屏蔽细节):

  • 第一步:私有化类构造
  • 第二步:内部定义一个类型为类本身的静态私有变量
  • 第三步:提供一个静态共有方法获取这个私有变量(获取之前赋值)

尤其是第三步,我们在获取这个私有变量的时候要对其进行赋值,那么就有两个阶段可以做这件事,

  • 定义静态私有变量的时候直接赋值
  • 调用公有静态方法的时候再赋值

这就引出了两种实现模式:饿汉模式和懒汉模式。

¶饿汉模式

饿汉从字面上的意思我们可以想到一个特别饥饿的大汉,而对于单例来讲则是形容以迫不及待的方式去将私有实例赋值,代码实现如下:

1
2
3
4
5
6
7
public class Single {
private static Single instance = new Single();
private Single() {}
public static Single getInstance() {
return instance;
}
}

这种模式的好处是不会存在并发下安全隐患,但是坏处也可想而知,对于jvm加载的过程就会将instance变量赋值,也就意味着我们即使没有用到这个单例对象也会将其实例new出来,可想而知,我们的永久带将会为其分配内存,带来的后果是永久带内存变少。

¶懒汉模式

懒汉模式则是在调用公有静态方法时才会为私有变量赋值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Single {
private volatile static Single instance = null;// 1
private Single() {}
public static Single getInstance() {
if(instance == null) { //2
synchronized (Single.class) { //3
if(instance == null) { //4
instance = new Single(); // 5
}
}
}
return instance;
}
}

相比之前的饿汉模式,我们有以下几个改动:

  • 标记1:增加volatile关键字保证5时不会指令重排
  • 标记2:是为了提高程序的效率,当Single对象被创建以后,再获取Single对象时就不用去验证同步代码块的锁及后面的代码,直接返回Single对象
  • 标记3:防止多线程下的重复执行
  • 标记4:同3,当多个线程同时调用getInstance方法,此时instance为空,两个线程可以轻松越过2,来到3抢锁,一个线程率先抢占到并且为instance赋值后,如果没有4的if判断,第二个线程也会重复去为instance赋值,这就会导致创建多个实例。

而我们使用volatile则是因为在标记5赋值的时候会发生指令重排的问题!

在Java中看似顺序的代码在JVM中,可能会出现编译器或者CPU对这些操作指令进行了重新排序;在特定情况下,指令重排将会给我们的程序带来不确定的结果…

对于instance = new Single()这一行代码,JVM执行的指令有多行:

1
2
3
memory = allocate(); //1:分配对象的内存空间
ctorInstance(memory); //2:初始化对象
instance = memory; //3:设置instance指向刚分配的内存地址

经重排后如下:

1
2
3
memory = allocate(); //1:分配对象的内存空间
instance = memory; //3:设置instance指向刚分配的内存地址,此时对象还没被初始化
ctorInstance(memory); //2:初始化对象

若有A线程进行完重排后的第二步,且未执行初始化对象。此时B线程来取instance时,发现instance不为空,于是便返回该值,但由于没有初始化完该对象,此时返回的对象是有问题的。

12
Nico

Nico

我是Nico,梦想是做一名兴趣使然的码农,于是我一步步变强,后来我成为了光头

19 日志
4 分类
7 标签
GitHub
友链
  • Nico's笔记
  • Abby's博客
  • 低调小熊猫
  • ZhangSan_Plus'博客
© 2022 小Nico