로그파일
 > kafka producer(broker에게 특정 topic에 대한 데이타 push)
 > kafka consumer는 broker에게 특정 topic을 pull로 가져감.

아래 예시는
"로그(consumer데이타) => FILE tail => kafka producer(broker에게 전달)" 파이프 라인으로
콘솔로 consumer를 모니터링 하고 해당 데이타를 pjm1.log파일로 저장
콘솔로 pruducer로 json데이타를 broker에게 전달. 해당데이타는 무한루프 로직으로 데이타 생성함.
 

 

 

comsumer콘솔 :

~/kafka/logs$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pjm1 --from-beginning > pjm1.log

 

producer콘솔 :
echo '[  {    "name": "jinmyung",    "age": 500,    "secretIdentity": "Andrew",    "powers": [      "Radiation resistance",      "Turning tiny",      "Radiation blast"    ]  }]'  | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pjm1

 

 

'빅데이타' 카테고리의 다른 글

streamsets 사용하기  (0) 2021.11.21
kafka 설치 및 실행  (0) 2021.11.21

https://streamsets.com/ 로그인

 

Deployments 메뉴를 이용하여 deployment task진행

: A deployment is a group of identical engine instances deployed within an environment.

intall script를 로컬환경에서 실행하면

streamSets collector를 다운로드 받음.

위 스크립트가 수행되면 아래 경로의 파일을 다운로드 함. (오류가 있으면 브라우저를 통해 다운로드 진행)

 

Starting download of 

https://archives.streamsets.com/datacollector/4.1.0/tarball/streamsets-datacollector-core-4.1.0.tgz

 

 

dcans@dcans:~/.streamsets/download/dc/collector$ sudo bin/streamsets dc

Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
INFO - Starting engine
Logging initialized @5466ms to org.eclipse.jetty.util.log.Slf4jLog
Running on URI : 'http://dcans:18630'

 

 

 

 

 

 

 

 

 

-------------------------------------------------------

StreamSets collector를 구동하기 위해 jdk 1.8이상 필요.

버전이 맞지 않아 기존 jdk 삭제후 재설치 함

 

jdk삭제하는 법

Remove OpenJDK,

sudo apt remove openjdk*

Remove OpenJDK along with dependencies,

sudo apt remove --auto-remove openjdk*

Remove OpenJDK and the configuration files

sudo apt purge openjdk*

 

 

'빅데이타' 카테고리의 다른 글

로그파일 > streamsets > kafka producer  (0) 2021.11.28
kafka 설치 및 실행  (0) 2021.11.21

 

kafka@dcans:~/logs$ sudo systemctl status zookeeper
● zookeeper.service
Loaded: loaded (/etc/systemd/system/zookeeper.service; enabled; vendor preset: enabled)
Active: active (running) since Sun 2021-11-21 18:21:47 KST; 27min ago
Main PID: 949 (java)
Tasks: 39 (limit: 8176)
Memory: 77.5M
CGroup: /system.slice/zookeeper.service
└─949 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=tr> INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir /tmp/zookeeper/version-2 sn>

INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.Ser>
INFO Configuring NIO connection handler with 10s sessionless connection timeout, 1 selector thread(s), 8 worker threads, >
INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
lines 1-19/19 (END)

 

kafka@dcans:~/logs$ sudo systemctl status kafka
● kafka.service
     Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
     Active: active (running) since Sun 2021-11-21 18:44:01 KST; 4min 14s ago
   Main PID: 7383 (sh)
      Tasks: 72 (limit: 8176)
     Memory: 331.1M
     CGroup: /system.slice/kafka.service
             ├─7383 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1
             └─7384 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true >

11월 21 18:44:01 dcans systemd[1]: Started kafka.service.
lines 1-11/11 (END)

 

 

 

kafka start 시 아래 에러 발생시 "meta.properties파일 삭제후  카프카 재시작한다".

kafka.common.InconsistentClusterIdException: The Cluster ID WX_SndaJRfmYqLQRDavlVg doesn't match stored clusterId Some(uJ8xz_r_SuKKvuQOCK0zgg) in meta.properties.

 

/home/kafka/kafka/config/server.properties

의 로그 폴더 확인

############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka/logs

meta.properties파일 삭제후

카프카 재실행

 

 

kafka 설치 및 테스트 참고url)

https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04

 

How To Install Apache Kafka on Ubuntu 20.04 | DigitalOcean

Apache Kafka is a popular distributed message broker designed to handle large volumes of real-time data. In this tutorial, you will install and use Apache Kafka 2.6.1 on Ubuntu 20.04.

www.digitalocean.com

 

producer, consumer

'빅데이타' 카테고리의 다른 글

로그파일 > streamsets > kafka producer  (0) 2021.11.28
streamsets 사용하기  (0) 2021.11.21

머신러닝 로드맵
https://dbourke.link/mlmap

해당 로드맵에 대한 설명
https://www.youtube.com/watch?v=pHiMN_gy9mk


'빅데이타 > DATA분석' 카테고리의 다른 글

데이터 분석을 위한 R(1)  (0) 2018.07.04

1개월간 공부한 R에 사용방법 정리.


R에 내장된 데이터 

Iris, BOD

 

 

데이터프레임(dataframe)구조 파악하기 => str(iris)

아래는 150개의 관측치(observation)가 있고 5개의 변수가 있다는 정보를 보여준다.

4개의 변수는 number형태이고

1개의 변수는 factor이다.

 

> str(iris)

'data.frame': 150 obs. of  5 variables:

 $ Sepal.Length: num  5.1 4.9 4.7 4.6 5 5.4 4.6 5 4.4 4.9 ...

 $ Sepal.Width : num  3.5 3 3.2 3.1 3.6 3.9 3.4 3.4 2.9 3.1 ...

 $ Petal.Length: num  1.4 1.4 1.3 1.5 1.4 1.7 1.4 1.5 1.4 1.5 ...

 $ Petal.Width : num  0.2 0.2 0.2 0.2 0.2 0.4 0.3 0.2 0.2 0.1 ...

 $ Species     : Factor w/ 3 levels "setosa","versicolor",..: 1 1 1 1 1 1 1 1 1 1 ...


데이터 일부확인하기

상위5하위5개 보기

> head(iris)

  Sepal.Length Sepal.Width Petal.Length Petal.Width Species

1          5.1         3.5          1.4         0.2  setosa

2          4.9         3.0          1.4         0.2  setosa

3          4.7         3.2          1.3         0.2  setosa

4          4.6         3.1          1.5         0.2  setosa

5          5.0         3.6          1.4         0.2  setosa

6          5.4         3.9          1.7         0.4  setosa

> tail(iris)

    Sepal.Length Sepal.Width Petal.Length Petal.Width   Species

145          6.7         3.3          5.7         2.5 virginica

146          6.7         3.0          5.2         2.3 virginica

147          6.3         2.5          5.0         1.9 virginica

148          6.5         3.0          5.2         2.0 virginica

149          6.2         3.4          5.4         2.3 virginica

150          5.9         3.0          5.1         1.8 virginica


데이터에 대한 summary로 자료의 특성 파악하기

> summary(iris)

  Sepal.Length    Sepal.Width     Petal.Length    Petal.Width          Species  

 Min.   :4.300   Min.   :2.000   Min.   :1.000   Min.   :0.100   setosa    :50  

 1st Qu.:5.100   1st Qu.:2.800   1st Qu.:1.600   1st Qu.:0.300   versicolor:50  

 Median :5.800   Median :3.000   Median :4.350   Median :1.300   virginica :50  

 Mean   :5.843   Mean   :3.057   Mean   :3.758   Mean   :1.199                  

 3rd Qu.:6.400   3rd Qu.:3.300   3rd Qu.:5.100   3rd Qu.:1.800                  

 Max.   :7.900   Max.   :4.400   Max.   :6.900   Max.   :2.500     


  

cbind() 함수를 사용해 두 데이터를 병합한 데이터프레임 생성해보기

먼저 행의 개수가(observation) 일치한 데이터프레임이어야 합니다.

내장 데이터프레임 BOD, iris가공해서 진행

 

> iris2 <- head(iris)


> str(BOD)

'data.frame': 6 obs. of  2 variables:

 $ Time  : num  1 2 3 4 5 7

 $ demand: num  8.3 10.3 19 16 15.6 19.8

 - attr(*, "reference")= chr "A1.4, p. 270"

> str(iris2)

'data.frame': 6 obs. of  5 variables:

 $ Sepal.Length: num  5.1 4.9 4.7 4.6 5 5.4

 $ Sepal.Width : num  3.5 3 3.2 3.1 3.6 3.9

 $ Petal.Length: num  1.4 1.4 1.3 1.5 1.4 1.7

 $ Petal.Width : num  0.2 0.2 0.2 0.2 0.2 0.4

 $ Species     : Factor w/ 3 levels "setosa","versicolor",..: 1 1 1 1 1 1


> iris2BOD <- cbind(iris2, BOD)

> show(iris2BOD)

  Sepal.Length Sepal.Width Petal.Length Petal.Width Species Time demand

1          5.1         3.5          1.4         0.2  setosa    1    8.3

2          4.9         3.0          1.4         0.2  setosa    2   10.3

3          4.7         3.2          1.3         0.2  setosa    3   19.0

4          4.6         3.1          1.5         0.2  setosa    4   16.0

5          5.0         3.6          1.4         0.2  setosa    5   15.6

6          5.4         3.9          1.7         0.4  setosa    7   19.8




# 데이터 정비

#iris2BOD 7개의 변수가 있는 데이터를  5개의 변수로 해서 새데이타프레임 생성

#즉, Sepal.Length, Sepal.Width, Species, Time, demand변수를 가진 iris2BOD2 데이터프레임생성하기


> iris2BOD2 <- iris2BOD[,c("Sepal.Length", "Sepal.Width", "Species", "Time", "demand")]

> show(iris2BOD2)

  Sepal.Length Sepal.Width Species Time demand

1          5.1         3.5  setosa    1    8.3

2          4.9         3.0  setosa    2   10.3

3          4.7         3.2  setosa    3   19.0

4          4.6         3.1  setosa    4   16.0

5          5.0         3.6  setosa    5   15.6

6          5.4         3.9  setosa    7   19.8




패키지 사용시

> library(ggplot2)

로 라이브러리에 추가하고 진행.




'빅데이타 > DATA분석' 카테고리의 다른 글

머신러닝 로드맵  (2) 2020.08.04

+ Recent posts