`
jgsj
  • 浏览: 960695 次
文章分类
社区版块
存档分类
最新评论

BookKeeper设计介绍及其在Hadoop2.0 Namenode HA方案中的使用分析

 
阅读更多

BookKeeper背景

BK是一个可靠的日志流记录系统,用于将系统产生的日志(也可以是其他数据)记录在BK集群上,由BK这个第三方Storage保证数据存储的可靠和一致性。典型场景是系统写write-ahead log,即先把log写到BK上,再对log做处理,比如将log写到内存的数据结构中。BookKeeper同时适用于任何单点写入并要求保证高性能和数据不丢失(Strong Durabilty Guarantees)的场景。

BK诞生于Hadoop2.0的namenode HA。在Hadoop中,出于故障恢复的考虑,Namenode在对它的记录做修改前都会先将本条修改的日志写到磁盘上。但是这里有一个潜在问题,当Namenode发生故障时,很可能连本地磁盘也不能访问,这时之前的记录的日志也就没用了。基于上述考虑,可以将Namenode的日志信息保存在一个可靠的外部Storage中。最初业界通过NFS这样的Share Storage来实现日志同步。之所以选择NFS,一方面因为可以很方便地实现数据共享,另外一方面是因为NFS相对稳定成熟。虽然如此,NFS也有缺点不能满足HDFS的在线存储业务:网络单点及其存储节点单点。为了满足共享日志的高可用性,社区引入了BK。除此之外还有默认的HA方案:QJM。Hadoop2.0 Namenode HA的介绍可以参考我之前的博文:Hadoop2.0 Namenode HA实现方案介绍及汇总


BookKeeper介绍

BK带有多个读写日志的server,称为 bookies。每一个bookie是一个bk的存储服务,存储了写到bk上的write-ahead日志,及其数据内容。写入的log流(称它为流是因为BK记录的是byte[])称为 ledgers,一个ledger是一个日志文件,每个日志单元叫 ledger entry,也就是bookies是存ledgers的。ledger只支持append操作,而且同时只能有一个单线程来写。ZK充当BK的元数据存储服务,在zk中会存储ledger相关的元数据,包括当前可用的bookies,ledger分布的位置等。

BK通过读写多个存储节点达到高可用性,同时为了恢复由于异常造成的多节点数据不一致性,引入了数据一致性算法。BK的可用性还体现在只要有足够多的bookies可用,整个服务就可用。实际上,一份entry的写入需要确保N份日志冗余在N个bookie上写成功,而我们需要>N个bookie提供服务。在启动BK的时候,需要指定一个ensemble值,即bookie可用的最小节点数量,还需要指定一个quorums值,即日志写入bk服务端的冗余份数。BK的可靠性体现在服务有多个备份,entry的记录也是冗余的。BK的可扩展性体现在可以增加bookie服务的定额数目,同时增加server数据可以一定程度提高吞吐量。

Ledger在BK中扮演了很重要的角色,其相关操作及其作用如下:

  • CreateLedger:创建一个空的ledger,此时会在zk中存储相关元数据;
  • AddEntry:添加一个记录到ledger中,如果客户端失败或者ledger已经关闭,则不能再追加entry;
  • openLedger:开始读取数据前,必须先打开ledger,如果某ledger处于未关闭,不能读取相关数据,如果有异常,需先恢复;
  • readEntries:读取ledger中的entry

从编码角度讲,操纵entry读写的类为LedgerHandle,LedgerHandle对应一个可以被client读写entry的ledger。下面是创建ledgerHandle并读写entry的例子。

ClientConfiguration conf = new ClientConfiguration();
conf.setZkServers("zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181"); 

BookKeeper client = new BookKeeper(conf);

LedgerHandle lh = client.createLedger(3, 2, DigestType.CRC32, "foobar");

lh.addEntry("Hello World!".getBytes());
lh.close();

LedgerHandle lh2 = client.openLedger(1, DigestType.CRC32, "foobar");
long lastEntry = lh2.getLastAddConfirmed();
Enumeration<LedgerEntry> entries = lh2.readEntries(0, 9);
while (entries.hasMoreElements()) {
	byte[] bytes = entries.nextElement().getEntry();
	System.out.println(new String(bytes));
}
更多BK文档可以参考官网文档

BookKeeper in HDFS

Hdfs有两个抽象类提供对EditLog的读出和写回:EditLogOutputStream(以下简称ELOS)和EditLogInputStream(以下简称ELIS)。同时还有一个JournalManager接口,负责管理EditLog的可靠存取。它的实现包括QJM(QuorumJournalManager)和BKJM(BookKeeperJournalManager)。


写日志

对于hdfs而言,主节点写的每一个日志对象为BK的entry,entry的集合组成一个ledger,每一个日志段对应一个ledger,相同日志段追加edits即为向ledger追加entry。Ledger有一个递增的ledgerId,entry也有递增的entryId,每个entryId对应一个txId。

ELOS使用write()将FSEditLogOp往外写,对应的BookKeeperEditLogOutputStream的实现为:

@Override
  public void write(FSEditLogOp op) throws IOException {
    writer.writeOp(op);

    if (bufCurrent.getLength() > transmissionThreshold) {
      transmit();
    }
  }

BookKeeperEditLogOutputStream内部有一个buffer,每次调用write()写FSEditLogOp的时候,会由一个Writer将此次FSEditLogOp写入buffer,当buffer长度达到门槛值时,进行transmit操作:把buffer里的editLog发送到BK上,代码如下:

/**
   * Transmit the current buffer to bookkeeper.
   * Synchronised at the FSEditLog level. #write() and #setReadyToFlush()
   * are never called at the same time.
   */
  private void transmit() throws IOException {
    if (!transmitResult.compareAndSet(BKException.Code.OK,
                                     BKException.Code.OK)) {
      throw new IOException("Trying to write to an errored stream;"
          + " Error code : (" + transmitResult.get()
          + ") " + BKException.getMessage(transmitResult.get()));
    }
    if (bufCurrent.getLength() > 0) {
      byte[] entry = Arrays.copyOf(bufCurrent.getData(),
                                   bufCurrent.getLength());
      lh.asyncAddEntry(entry, this, null);
      bufCurrent.reset();
      outstandingRequests.incrementAndGet();
    }
  }

lh为BK的LedgerHandle,asyncAddEntry方法异步将entry写往一个open状态的ledger。这就是一个简单的把Editlog写往BK的过程。

BKJM简单写的代码如下:

public void testSimpleWrite() throws Exception {
    NamespaceInfo nsi = newNSInfo();
    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
        BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
    bkjm.format(nsi);

    EditLogOutputStream out = bkjm.startLogSegment(1);
    for (long i = 1 ; i <= 100; i++) {
      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
      op.setTransactionId(i);
      out.write(op);
    }
    out.close();
    bkjm.finalizeLogSegment(1, 100);
 
    String zkpath = bkjm.finalizedLedgerZNode(1, 100);
  }

BKJM的startLogSegment(txId)将产生一个新的ledger,对应一个新的日志段,该日志段状态为接收写入日志的状态。创建ledger之前有一些校验工作

if (txId <= maxTxId.get()) {
      throw new IOException("We've already seen " + txId
          + ". A new stream cannot be created with it");
    }

    try {
      String existingInprogressNode = ci.read();
      if (null != existingInprogressNode
          && zkc.exists(existingInprogressNode, false) != null) {
        throw new IOException("Inprogress node already exists");
      }
      if (currentLedger != null) {
        // bookkeeper errored on last stream, clean up ledger
        currentLedger.close();
      }
      currentLedger = bkc.createLedger(ensembleSize, quorumSize,
                                       BookKeeper.DigestType.MAC,
                                       digestpw.getBytes());
    } catch (BKException bke) {
      throw new IOException("Error creating ledger", bke);
    } catch (KeeperException ke) {
      throw new IOException("Error in zookeeper while creating ledger", ke);
    } catch (InterruptedException ie) {
      Thread.currentThread().interrupt();
      throw new IOException("Interrupted creating ledger", ie);
    }

Ledger的创建还对应一个新的EditLogLedgerMetadata,该类记录这个日志段的元信息,包括zkPath,ledgerId,开始和结束txId等,在读取ledger里的日志内容的时候需要这些元数据信息。

BKJM的finalizeLogSegment()将文件由正在写入日志的状态转化为不接收写日志的状态。BKJM会create ledger,delete ledger,open ledger,这里的ledger即LedgerHandler类,它对每个ledger entry进行读写操作。

写日志总体流程

ZK作为BK的元数据服务器,里面存储了哪些bookie服务是可用的,同时也记录了目前系统有哪些ledger,及其ledger相关信息,如该ledger数据存储在哪些机器上,及其该ledger起始,结束entryid等。Bookie节点存储实际的数据,及其数据的读写服务。

写操作由主节点来完成,当主节点调用setReadyToFlush操作,会调用RPC同时向N(N=quorums)个bookie节点写,flush异步等待响应。

主节点对bk的操作,其实就是对ledger的操作,在开始向bk服务写数据前,首先需要打开ledger,打开ledger就会与配置的所有bookie节点建立连接;打开连接后,数据以entry为单位以RR算法选择向N(N=quorums)个bookie节点写entry数据,并且异步地等待结果返回,有任何一个bookie写入失败,则需要重新选择一个bookie写入失败的副本。

当bookie服务端接收到写入数据后,首先会写日志,然后根据同步或者异步算法将数据同步到磁盘上。写入数据过程中,首先会写入log文件,写入的内容包含ledgerid,entryid,EntrySize,LastConfirmed,及其真实数据内容。然后在相应ledger文件中记录下entryid,及其该entry所在的日志文件,偏移量等。

读日志

读日志相比写日志过程,相对简单一些。同样,读日志过程也支持高可用。BKJM通过selectInputStreams方法读出一个范围内的ELIS集合,每个ELIS是BookKeeperEditLogInputStream类,new BookKeeperEditLogInputStream需要得到一个EditLogLedgerMetadata,并打开对应的ledger。具体BookKeeperEditLogInputStream类里的内容就不详细说明了。

@Override
  public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxId, boolean inProgressOk, boolean forReading)
      throws IOException {
    List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
        inProgressOk);
    try {
      BookKeeperEditLogInputStream elis = null;
      for (EditLogLedgerMetadata l : currentLedgerList) {
        long lastTxId = l.getLastTxId();
        if (l.isInProgress()) {
          lastTxId = recoverLastTxId(l, false);
        }
        // Check once again, required in case of InProgress and is case of any
        // gap.
        if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
          LedgerHandle h;
          if (l.isInProgress()) { // we don't want to fence the current journal
            h = bkc.openLedgerNoRecovery(l.getLedgerId(),
                BookKeeper.DigestType.MAC, digestpw.getBytes());
          } else {
            h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
                digestpw.getBytes());
          }
          elis = new BookKeeperEditLogInputStream(h, l);
          elis.skipTo(fromTxId);
        } else {
          // If mismatches then there might be some gap, so we should not check
          // further.
          return;
        }
        streams.add(elis);
        if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
          return;
        }
        fromTxId = elis.getLastTxId() + 1;
      }
    } catch (BKException e) {
      throw new IOException("Could not open ledger for " + fromTxId, e);
    } catch (InterruptedException ie) {
      Thread.currentThread().interrupt();
      throw new IOException("Interrupted opening ledger for " + fromTxId, ie);
    }
  }

首先选择日志文件,建立输入流。从节点触发消化日志后,首先会查询ZK,获取到主节点写入ZK的edits元数据信息(不包含inprocess状态的edits元数据),这个元数据包含日志段的startTxid,lastTxid,ledgerID,同时也会打开相应的ledger,并获取其元数据,如ledger的quorumSize,ensembleSize,lastEntryId等,同时按照txid先后顺序对ledger进行排序,放入输入流集合。需要强调的是,当打开ledger时,会检查其entry副本之间的一致性,如果不一致需恢复。

准备好输入流以后,开始消化日志,依次操作输入流集合的ledgers,读取每个ledger内的entry:

  1. 通过查询ledger元数据,同时通过RR算法确定该entry存储在哪几个bookies;
  2. 尝试从bookies集合的第一个bookie服务读取entry,如果成功,该entry就读取成功,如果失败,转入第3步;
  3. 尝试从bookies集合的第二个bookie服务读取entry,如果成功,该entry就读取成功,如果失败,依次类推,如果尝试读取完所有的bookies均失败,则该entry读取失败;

恢复

BKJM还有恢复机制,相关接口有recoverUnfinalizedSegments(),recoverLastTxId()。Bookie数据恢复检查通过定时或者人工发起,集群数据修复流程:

  1. 通过zk查询到ledger元数据;
  2. 通过元数据,查询相关bookie中存储的ledger的entry是否完整;
  3. 如果查询到存储在某bookie上的entry不完整,则需要进入数据恢复流程;
  4. 首先从bk服务端读取到ledger相关的entry,然后将其写到需要恢复entry的某bookie服务端;
  5. Ledger数据恢复完成后,需要更新ledger的segment相关元数据。


总结

本文首先介绍了BookKeeper的背景和使用场景,然后简单介绍了BK的主要部件及使用方法,最后粗略地分析了hadoop2.0 namenode BKJM的HA实现,介绍了EditLog写入和读出BK的过程。通过阅读hadoopBKJM部分的代码,帮助学习怎样在自己的系统里加入BookKeeper,让BK来保证日志的可靠和容灾恢复等功能。


(全文完)
分享到:
评论

相关推荐

    Apache BookKeeper

    Apache BookKeeper High-performance reliable write-ahead logging.介绍了hadoop zookeeper的高可用性能。读写的效率等内容。

    hadoop-2.2.0-x64.tar.gz part3

    [INFO] Apache Hadoop HDFS BookKeeper Journal ............. SUCCESS [3:31.569s] [INFO] Apache Hadoop HDFS-NFS ............................ SUCCESS [4.749s] [INFO] Apache Hadoop HDFS Project ..............

    hadoop-2.2.0-x64.tar.gz part2

    [INFO] Apache Hadoop HDFS BookKeeper Journal ............. SUCCESS [3:31.569s] [INFO] Apache Hadoop HDFS-NFS ............................ SUCCESS [4.749s] [INFO] Apache Hadoop HDFS Project ..............

    bookkeeper-4.3.0-src.tar.gz

    学习hadoop时,需要用到bookkeeper这个例子,大家可以下载源来学习和参考。

    bookkeeper

    调试Hadoop源码,查看Hadoop源码必不可少的jar包

    大数据知识仓库涉及到数据仓库建模、实时计算、大数据、数据中台、系统设计、Java、算法等代码

    5、Hadoop生态圈的学习笔记,主要记录HDFS、MapReduce、Yarn相关读书笔记及源码分析等 5.1 HDFS Hadoop快速入门 HDFSOverView Hadoop广义生态系统 Hadoop高可用配置 HadoopCommon分析 HDFS集群相关管理 HDFS Shell ...

    Maheshwara Rao G:HDFS NameNode的高可用性研究

    在12月1日“Hadoop生态系统”主题分论坛,华为电信与核心网产品线BigData团队的架构师Uma ...他的演讲主题是“HDFS Name Node高可用性分析”,通过对构建在Bookkeeper上的的HDFS Name Node高可用性,尤其是...

    hadoop-2.2.0-x64.tar.gz part1

    自己编译的64bit的hadoop-2.2.0,自己lab安装使用。 [INFO] Reactor Summary: [INFO] [INFO] Apache Hadoop Main ................................ SUCCESS [1.834s] [INFO] Apache Hadoop Project POM .............

    大数据存储及分层实践-1-4 BookKeeper:Apache Pulsar的存储实现.pdf

    大数据存储及分层实践-1-4 BookKeeper:Apache Pulsar的存储实现

    project_2_bookkeeper

    欢迎使用Bookkeeper-使用此应用程序,您可以帮助我们建立起庞大的书籍,作者,体裁和出版商数据库! 所有信息均可通过下面列出的GET端点公开获得。 要为数据库做出贡献,并维护自己的收藏夹清单,您需要注册为用户。...

    Pulsar Storage on BookKeeper

    Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性...

    bookkeeper-4.13.0-src.tar.gz

    BookKeeper是可靠的复制日志服务。 它可用于将任何独立服务转变为高可用性的复制服务。 BookKeeper具有高可用性(无单点故障),并随着添加更多存储节点而水平扩展。bookkeeper/bookkeeper-4.13.0

    Flavio Junqueira:Apache BookKeeper——高性能可靠预写式日志

    为了高效的顺序写入,较好的容错性和可扩展性,设计了BookKeeper。它的结构由他们组成:Bookie(存储节点),Ledger(日志文档),Ensemble(用一组Bookie存储Ledger)。

    BookKeeper源码解析之Bookie启动流程(一)

    BookKeeper(BK)启动流程 文章目录BookKeeper(BK)启动流程解析命令行参数构建bookie所需的服务构建状态(指标)服务构建BookieService构造内存分配器构造NettyServer构建Bookie BK的启动入口类是Main,Main有一个...

    桌面账本 BookKeeper 1.0.0

    个人或者企业都适用的桌面记账软件,灵活易用,可以通过设置配置满足大多数的应用要求,霏凡已有更新版本!!!!,霏凡已有更新版本!!!!,霏凡已有更新版本!!!!

    PyPI 官网下载 | Bookkeeper-0.0.2.tar.gz

    资源来自pypi官网。 资源全名:Bookkeeper-0.0.2.tar.gz

Global site tag (gtag.js) - Google Analytics